| Safe Haskell | Safe-Inferred |
|---|---|
| Language | GHC2021 |
Hydra.Network.Etcd
Contents
Description
Implements a Hydra network component using etcd.
While implementing a basic broadcast protocol over a distributed key-value store is quite an overkill, the Raft consensus of etcd provides our application with a crash-recovery fault-tolerant "atomic broadcast" out-of-the-box. As a nice side-effect, the network layer becomes very introspectable, while it would also support features like TLS or service discovery.
The component installs, starts and configures an $sel:etcd:EtcdLog instance and connects
to it using a GRPC client. We can only write and read from the cluster while
connected to the majority cluster.
Broadcasting is implemented using put to some well-known key, while message
delivery is done by using watch on the same msg prefix. We keep a last known
revision, also stored on disk, to start watch with that revision (+1) and
only deliver messages that were not seen before. In case we are not connected
to our $sel:etcd:EtcdLog instance or not enough peers (= on a minority cluster), we
retry sending, but also store messages to broadcast in a PersistentQueue,
which makes the node resilient against crashes while sending. TODO: Is this
needed? performance limitation?
Connectivity and compatibility with other nodes on the cluster is tracked using the key-value service as well:
- network connectivity is determined by being able to fetch the member list
- peer connectivity is tracked (best effort, not authorized) using an entry at 'alive-<advertise>' keys with individual leases and repeated keep-alives
- each node compare-and-swaps its
$sel:version:EtcdLoginto a key of the same name to check compatibility (not updatable)
Note that the etcd cluster is configured to compact revisions down to 1000 every 5 minutes. This prevents infinite growth of the key-value store, but also limits how long a node can be disconnected without missing out. 1000 should be more than enough for our use-case as the Hydra protocol will not advance unless all participants are present.
Synopsis
- withEtcdNetwork :: (ToCBOR msg, FromCBOR msg, Eq msg) => Tracer IO EtcdLog -> ProtocolVersion -> NetworkConfiguration -> NetworkComponent IO msg msg ()
- getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath
- installEtcd :: FilePath -> IO ()
- checkVersion :: Tracer IO EtcdLog -> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
- broadcastMessages :: (ToCBOR msg, Eq msg) => Tracer IO EtcdLog -> Connection -> Host -> PersistentQueue IO msg -> IO ()
- putMessage :: ToCBOR msg => Connection -> Host -> msg -> IO ()
- waitMessages :: FromCBOR msg => Tracer IO EtcdLog -> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
- getLastKnownRevision :: MonadIO m => FilePath -> m Natural
- putLastKnownRevision :: MonadIO m => FilePath -> Natural -> m ()
- pollConnectivity :: Tracer IO EtcdLog -> Connection -> Host -> NetworkCallback msg IO -> IO ()
- withGrpcContext :: MonadCatch m => Text -> m a -> m a
- withProcessInterrupt :: (MonadIO m, MonadThrow m) => ProcessConfig stdin stdout stderr -> (Process stdin stdout stderr -> m a) -> m a
- data PersistentQueue m a = PersistentQueue {}
- newPersistentQueue :: (MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) => FilePath -> Natural -> m (PersistentQueue m a)
- writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
- peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a
- popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m ()
- data EtcdLog
- = EtcdLog {
- etcd :: Value
- | Reconnecting
- | BroadcastFailed { }
- | FailedToDecodeLog { }
- | FailedToDecodeValue { }
- | CreatedLease { }
- | LowLeaseTTL { }
- | NoKeepAliveResponse
- | MatchingProtocolVersion { }
- = EtcdLog {
Documentation
withEtcdNetwork :: (ToCBOR msg, FromCBOR msg, Eq msg) => Tracer IO EtcdLog -> ProtocolVersion -> NetworkConfiguration -> NetworkComponent IO msg msg () Source #
Concrete network component that broadcasts messages to an etcd cluster and listens for incoming messages.
getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath Source #
Return the path of the etcd binary. Will either install it first, or just assume there is one available on the system path.
installEtcd :: FilePath -> IO () Source #
Install the embedded $sel:etcd:EtcdLog binary to given file path.
checkVersion :: Tracer IO EtcdLog -> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO () Source #
Check and write version on etcd cluster. This will retry until we are on a
majority cluster and succeed. If the version does not match a corresponding
Connectivity message is sent via NetworkCallback.
Arguments
| :: (ToCBOR msg, Eq msg) | |
| => Tracer IO EtcdLog | |
| -> Connection | |
| -> Host | Used to identify sender. |
| -> PersistentQueue IO msg | |
| -> IO () |
Broadcast messages from a queue to the etcd cluster.
TODO: retrying on failure even needed?
Retries on failure to putMessage in case we are on a minority cluster.
Broadcast a message to the etcd cluster.
waitMessages :: FromCBOR msg => Tracer IO EtcdLog -> Connection -> FilePath -> NetworkCallback msg IO -> IO () Source #
Fetch and wait for messages from the etcd cluster.
Write a well-known key to indicate being alive, keep it alive using a lease and poll other peers entries to yield connectivity events. While doing so, overall network connectivity is determined from the ability to read/write to the cluster.
withGrpcContext :: MonadCatch m => Text -> m a -> m a Source #
Add context to the grpcErrorMessage of any GrpcException raised.
withProcessInterrupt :: (MonadIO m, MonadThrow m) => ProcessConfig stdin stdout stderr -> (Process stdin stdout stderr -> m a) -> m a Source #
Like withProcessTerm, but sends first SIGINT and only SIGTERM if not
stopped within 5 seconds.
Persistent queue
data PersistentQueue m a Source #
newPersistentQueue :: (MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) => FilePath -> Natural -> m (PersistentQueue m a) Source #
Create a new persistent queue at file path and given capacity.
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m () Source #
Write a value to the queue, blocking if the queue is full.
peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a Source #
Get the next value from the queue without removing it, blocking if the queue is empty.
popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m () Source #
Remove an element from the queue if it matches the given item. Use
peekPersistentQueue to wait for next items before popping it.
Tracing
Constructors
| EtcdLog | |
Fields
| |
| Reconnecting | |
| BroadcastFailed | |
| FailedToDecodeLog | |
| FailedToDecodeValue | |
| CreatedLease | |
| LowLeaseTTL | |
Fields | |
| NoKeepAliveResponse | |
| MatchingProtocolVersion | |
Fields | |