hydra-node-0.21.0: The Hydra node
Safe HaskellSafe-Inferred
LanguageGHC2021

Hydra.Network.Etcd

Description

Implements a Hydra network component using etcd.

While implementing a basic broadcast protocol over a distributed key-value store is quite an overkill, the Raft consensus of etcd provides our application with a crash-recovery fault-tolerant "atomic broadcast" out-of-the-box. As a nice side-effect, the network layer becomes very introspectable, while it would also support features like TLS or service discovery.

The component starts and configures an $sel:etcd:EtcdLog instance and connects to it using a GRPC client. We can only write and read from the cluster while connected to the majority cluster.

Broadcasting is implemented using put to some well-known key, while message delivery is done by using watch on the same msg prefix. We keep a last known revision, also stored on disk, to start watch with that revision (+1) and only deliver messages that were not seen before. In case we are not connected to our $sel:etcd:EtcdLog instance or not enough peers (= on a minority cluster), we retry sending, but also store messages to broadcast in a PersistentQueue, which makes the node resilient against crashes while sending. TODO: Is this needed? performance limitation?

Connectivity and compatibility with other nodes on the cluster is tracked using the key-value service as well:

  • network connectivity is determined by being able to fetch the member list
  • peer connectivity is tracked (best effort, not authorized) using an entry at 'alive-<advertise>' keys with individual leases and repeated keep-alives
  • each node compare-and-swaps its $sel:version:EtcdLog into a key of the same name to check compatibility (not updatable)

Note that the etcd cluster is configured to compact revisions down to 1000 every 5 minutes. This prevents infinite growth of the key-value store, but also limits how long a node can be disconnected without missing out. 1000 should be more than enough for our use-case as the Hydra protocol will not advance unless all participants are present.

Synopsis

Documentation

withEtcdNetwork :: (ToCBOR msg, FromCBOR msg, Eq msg) => Tracer IO EtcdLog -> ProtocolVersion -> NetworkConfiguration -> NetworkComponent IO msg msg () Source #

Concrete network component that broadcasts messages to an etcd cluster and listens for incoming messages.

checkVersion :: Tracer IO EtcdLog -> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO () Source #

Check and write version on etcd cluster. This will retry until we are on a majority cluster and succeed. If the version does not match a corresponding Connectivity message is sent via NetworkCallback.

broadcastMessages Source #

Arguments

:: (ToCBOR msg, Eq msg) 
=> Tracer IO EtcdLog 
-> Connection 
-> Host

Used to identify sender.

-> PersistentQueue IO msg 
-> IO () 

Broadcast messages from a queue to the etcd cluster.

TODO: retrying on failure even needed? Retries on failure to putMessage in case we are on a minority cluster.

putMessage Source #

Arguments

:: ToCBOR msg 
=> Connection 
-> Host

Used to identify sender.

-> msg 
-> IO () 

Broadcast a message to the etcd cluster.

waitMessages :: FromCBOR msg => Tracer IO EtcdLog -> Connection -> FilePath -> NetworkCallback msg IO -> IO () Source #

Fetch and wait for messages from the etcd cluster.

pollConnectivity Source #

Arguments

:: Tracer IO EtcdLog 
-> Connection 
-> Host

Local host

-> NetworkCallback msg IO 
-> IO () 

Write a well-known key to indicate being alive, keep it alive using a lease and poll other peers entries to yield connectivity events. While doing so, overall network connectivity is determined from the ability to read/write to the cluster.

withGrpcContext :: MonadCatch m => Text -> m a -> m a Source #

Add context to the grpcErrorMessage of any GrpcException raised.

withProcessInterrupt :: (MonadIO m, MonadThrow m) => ProcessConfig stdin stdout stderr -> (Process stdin stdout stderr -> m a) -> m a Source #

Like withProcessTerm, but sends first SIGINT and only SIGTERM if not stopped within 5 seconds.

Persistent queue

data PersistentQueue m a Source #

Constructors

PersistentQueue 

Fields

newPersistentQueue :: (MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) => FilePath -> Natural -> m (PersistentQueue m a) Source #

Create a new persistent queue at file path and given capacity.

writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m () Source #

Write a value to the queue, blocking if the queue is full.

peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a Source #

Get the next value from the queue without removing it, blocking if the queue is empty.

popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m () Source #

Remove an element from the queue if it matches the given item. Use peekPersistentQueue to wait for next items before popping it.

Tracing

data EtcdLog Source #

Instances

Instances details
Arbitrary EtcdLog Source # 
Instance details

Defined in Hydra.Network.Etcd

Methods

arbitrary :: Gen EtcdLog

shrink :: EtcdLog -> [EtcdLog]

ToJSON EtcdLog Source # 
Instance details

Defined in Hydra.Network.Etcd

Methods

toJSON :: EtcdLog -> Value

toEncoding :: EtcdLog -> Encoding

toJSONList :: [EtcdLog] -> Value

toEncodingList :: [EtcdLog] -> Encoding

omitField :: EtcdLog -> Bool

Generic EtcdLog Source # 
Instance details

Defined in Hydra.Network.Etcd

Associated Types

type Rep EtcdLog :: Type -> Type Source #

Show EtcdLog Source # 
Instance details

Defined in Hydra.Network.Etcd

Eq EtcdLog Source # 
Instance details

Defined in Hydra.Network.Etcd

type Rep EtcdLog Source # 
Instance details

Defined in Hydra.Network.Etcd

type Rep EtcdLog = D1 ('MetaData "EtcdLog" "Hydra.Network.Etcd" "hydra-node-0.21.0-IAzUD3nPoh4KjMRMHOKHuC" 'False) (((C1 ('MetaCons "EtcdLog" 'PrefixI 'True) (S1 ('MetaSel ('Just "etcd") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Value)) :+: C1 ('MetaCons "Reconnecting" 'PrefixI 'False) (U1 :: Type -> Type)) :+: (C1 ('MetaCons "BroadcastFailed" 'PrefixI 'True) (S1 ('MetaSel ('Just "reason") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "FailedToDecodeLog" 'PrefixI 'True) (S1 ('MetaSel ('Just "log") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text) :*: S1 ('MetaSel ('Just "reason") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))) :+: ((C1 ('MetaCons "FailedToDecodeValue" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text) :*: (S1 ('MetaSel ('Just "value") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text) :*: S1 ('MetaSel ('Just "reason") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text))) :+: C1 ('MetaCons "CreatedLease" 'PrefixI 'True) (S1 ('MetaSel ('Just "leaseId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64))) :+: (C1 ('MetaCons "LowLeaseTTL" 'PrefixI 'True) (S1 ('MetaSel ('Just "ttlRemaining") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 DiffTime)) :+: (C1 ('MetaCons "NoKeepAliveResponse" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "MatchingProtocolVersion" 'PrefixI 'True) (S1 ('MetaSel ('Just "version") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProtocolVersion))))))