Safe Haskell | Safe-Inferred |
---|---|
Language | GHC2021 |
Hydra.Network.Etcd
Contents
Description
Implements a Hydra network component using etcd.
While implementing a basic broadcast protocol over a distributed key-value store is quite an overkill, the Raft consensus of etcd provides our application with a crash-recovery fault-tolerant "atomic broadcast" out-of-the-box. As a nice side-effect, the network layer becomes very introspectable, while it would also support features like TLS or service discovery.
The component starts and configures an $sel:etcd:EtcdLog
instance and connects to it
using a GRPC client. We can only write and read from the cluster while
connected to the majority cluster.
Broadcasting is implemented using put
to some well-known key, while message
delivery is done by using watch
on the same msg
prefix. We keep a last known
revision, also stored on disk, to start watch
with that revision (+1) and
only deliver messages that were not seen before. In case we are not connected
to our $sel:etcd:EtcdLog
instance or not enough peers (= on a minority cluster), we
retry sending, but also store messages to broadcast in a PersistentQueue
,
which makes the node resilient against crashes while sending. TODO: Is this
needed? performance limitation?
Connectivity and compatibility with other nodes on the cluster is tracked using the key-value service as well:
- network connectivity is determined by being able to fetch the member list
- peer connectivity is tracked (best effort, not authorized) using an entry at 'alive-<advertise>' keys with individual leases and repeated keep-alives
- each node compare-and-swaps its
$sel:version:EtcdLog
into a key of the same name to check compatibility (not updatable)
Note that the etcd cluster is configured to compact revisions down to 1000 every 5 minutes. This prevents infinite growth of the key-value store, but also limits how long a node can be disconnected without missing out. 1000 should be more than enough for our use-case as the Hydra protocol will not advance unless all participants are present.
Synopsis
- withEtcdNetwork :: (ToCBOR msg, FromCBOR msg, Eq msg) => Tracer IO EtcdLog -> ProtocolVersion -> NetworkConfiguration -> NetworkComponent IO msg msg ()
- checkVersion :: Tracer IO EtcdLog -> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
- broadcastMessages :: (ToCBOR msg, Eq msg) => Tracer IO EtcdLog -> Connection -> Host -> PersistentQueue IO msg -> IO ()
- putMessage :: ToCBOR msg => Connection -> Host -> msg -> IO ()
- waitMessages :: FromCBOR msg => Tracer IO EtcdLog -> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
- getLastKnownRevision :: MonadIO m => FilePath -> m Natural
- putLastKnownRevision :: MonadIO m => FilePath -> Natural -> m ()
- pollConnectivity :: Tracer IO EtcdLog -> Connection -> Host -> NetworkCallback msg IO -> IO ()
- withGrpcContext :: MonadCatch m => Text -> m a -> m a
- withProcessInterrupt :: (MonadIO m, MonadThrow m) => ProcessConfig stdin stdout stderr -> (Process stdin stdout stderr -> m a) -> m a
- data PersistentQueue m a = PersistentQueue {}
- newPersistentQueue :: (MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) => FilePath -> Natural -> m (PersistentQueue m a)
- writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
- peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a
- popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m ()
- data EtcdLog
- = EtcdLog {
- etcd :: Value
- | Reconnecting
- | BroadcastFailed { }
- | FailedToDecodeLog { }
- | FailedToDecodeValue { }
- | CreatedLease { }
- | LowLeaseTTL { }
- | NoKeepAliveResponse
- | MatchingProtocolVersion { }
- = EtcdLog {
Documentation
withEtcdNetwork :: (ToCBOR msg, FromCBOR msg, Eq msg) => Tracer IO EtcdLog -> ProtocolVersion -> NetworkConfiguration -> NetworkComponent IO msg msg () Source #
Concrete network component that broadcasts messages to an etcd cluster and listens for incoming messages.
checkVersion :: Tracer IO EtcdLog -> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO () Source #
Check and write version on etcd cluster. This will retry until we are on a
majority cluster and succeed. If the version does not match a corresponding
Connectivity
message is sent via NetworkCallback
.
Arguments
:: (ToCBOR msg, Eq msg) | |
=> Tracer IO EtcdLog | |
-> Connection | |
-> Host | Used to identify sender. |
-> PersistentQueue IO msg | |
-> IO () |
Broadcast messages from a queue to the etcd cluster.
TODO: retrying on failure even needed?
Retries on failure to putMessage
in case we are on a minority cluster.
Broadcast a message to the etcd cluster.
waitMessages :: FromCBOR msg => Tracer IO EtcdLog -> Connection -> FilePath -> NetworkCallback msg IO -> IO () Source #
Fetch and wait for messages from the etcd cluster.
Write a well-known key to indicate being alive, keep it alive using a lease and poll other peers entries to yield connectivity events. While doing so, overall network connectivity is determined from the ability to read/write to the cluster.
withGrpcContext :: MonadCatch m => Text -> m a -> m a Source #
Add context to the grpcErrorMessage
of any GrpcException
raised.
withProcessInterrupt :: (MonadIO m, MonadThrow m) => ProcessConfig stdin stdout stderr -> (Process stdin stdout stderr -> m a) -> m a Source #
Like withProcessTerm
, but sends first SIGINT and only SIGTERM if not
stopped within 5 seconds.
Persistent queue
data PersistentQueue m a Source #
newPersistentQueue :: (MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) => FilePath -> Natural -> m (PersistentQueue m a) Source #
Create a new persistent queue at file path and given capacity.
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m () Source #
Write a value to the queue, blocking if the queue is full.
peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a Source #
Get the next value from the queue without removing it, blocking if the queue is empty.
popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m () Source #
Remove an element from the queue if it matches the given item. Use
peekPersistentQueue
to wait for next items before popping it.
Tracing
Constructors
EtcdLog | |
Fields
| |
Reconnecting | |
BroadcastFailed | |
FailedToDecodeLog | |
FailedToDecodeValue | |
CreatedLease | |
LowLeaseTTL | |
Fields | |
NoKeepAliveResponse | |
MatchingProtocolVersion | |
Fields |