hydra-node-0.20.0: The Hydra node
Safe HaskellSafe-Inferred
LanguageGHC2021

Hydra.Network.Reliability

Description

A Network layer that provides resilience in the face of network connectivity loss and (most) crashes.

This network layer takes care of 2 aspects that together improve the reliability and operability of a Hydra cluster, making it more tolerant to transient failures and therefore alleviating the need to prematurely close heads:

  1. To safeguard against lost of connectivity, it keeps track of messages indices and resend lost messages,
  2. To safeguard against crashes, it persists all sent messages and indices on disk.

Messages ordering & resending

This layer implements an algorithm based on vector clocks, loosely inspired from Reliable consistent broadcast algorithms with FIFO ordering as presented in Introduction to Reliable and Secure Distributed Programming, ch. 3.9, by Cachin et al.

Each node maintains a vector of monotonically increasing integer indices denoting the index of the last message known (sent or received) for each peer, where a peer is identified a Party, which is updated upon sending and receiving messages, and is sent with each message.

The basic algorithm is simple:

  • When a message is sent, the index of the current node's party is incremented,
  • When a message is received:

    • It is discarded if the index for the sender's party in the message is not exactly one more than the latest known index,
    • If our own party's index, as broadcasted by the sender, is lower than our latest known index, and the peer appears quiescent we resend all the "missing" messages.

As shown by the signature of the withReliability function, this layer relies on an authentication layer providing Authenticated messages in order to securely identify senders, and also on Heartbeat messages in order to provide some "liveness".

Heartbeat messages are critical in order to signal peers our current view of the world, because it could be the case that we don't have any network message to send which leads to head being stalled. Ping messages in particular are used to denote the fact the sender is quiescent, ie. it's not able to make any observable progress. Those messages are treated specially, both when receiving and sending:

  • When sending a Ping, we don't increment our local message counter before broadcasting it,
  • Conversely, when receiving a Ping, we don't update the peer's message counter but only take into account their view of our counter in order to compute whether or not to resend previous messages.

Messages persistence

The MessagePersistence handle defines an interface that's used to load and persist both the sequence of messages $sel:broadcast:Networked, and the vector clock representing our current view of the all peers' knowledge about messages. Because it's hard to guarantee atomicity of IO operations on files, we only save the indices when we broadcast and use a local cache in the NetworkCallback: The underlying layers can callback us concurrently, eg. when each connection to peers is managed by a dedicated thread.

NOTE: We do not recover (at least) from one particular crash situation: When the withReliability "delivers" a message to the calling code, it's usually put in a queue wrapped into a NetworkEvent and then handled later on. Should the node crashes at that particular point, it won't be resubmitted the same message later because the message indices could have been updated and written to on disk already.

NOTE: Messages sent are currently kept indefinitely on disk because we don't set any bound to how far from the past a peer would need to be resent messages. In the case of very long-running head with a high frequency of transaction submission, can lead to significant storage use. Should this become a problem, this can be mitigated by closing and reopening a head.

Synopsis

Documentation

data ReliableMsg msg Source #

Constructors

ReliableMsg 

Fields

  • knownMessageIds :: Vector Int

    Vector of highest known counter for each known party. Serves as announcement of which messages the sender of ReliableMsg has seen. The individual counters have nothing to do with the message also included in this.

  • payload :: msg
     

Instances

Instances details
ToJSON msg => ToJSON (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

Methods

toJSON :: ReliableMsg msg -> Value

toEncoding :: ReliableMsg msg -> Encoding

toJSONList :: [ReliableMsg msg] -> Value

toEncodingList :: [ReliableMsg msg] -> Encoding

omitField :: ReliableMsg msg -> Bool

Generic (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

Associated Types

type Rep (ReliableMsg msg) :: Type -> Type Source #

Methods

from :: ReliableMsg msg -> Rep (ReliableMsg msg) x Source #

to :: Rep (ReliableMsg msg) x -> ReliableMsg msg Source #

Show msg => Show (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

FromCBOR msg => FromCBOR (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

Methods

fromCBOR :: Decoder s (ReliableMsg msg)

label :: Proxy (ReliableMsg msg) -> Text

ToCBOR msg => ToCBOR (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

Methods

toCBOR :: ReliableMsg msg -> Encoding

encodedSizeExpr :: (forall t. ToCBOR t => Proxy t -> Size) -> Proxy (ReliableMsg msg) -> Size

encodedListSizeExpr :: (forall t. ToCBOR t => Proxy t -> Size) -> Proxy [ReliableMsg msg] -> Size

ToCBOR msg => SignableRepresentation (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

Eq msg => Eq (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

Methods

(==) :: ReliableMsg msg -> ReliableMsg msg -> Bool Source #

(/=) :: ReliableMsg msg -> ReliableMsg msg -> Bool Source #

type Rep (ReliableMsg msg) Source # 
Instance details

Defined in Hydra.Network.Reliability

type Rep (ReliableMsg msg) = D1 ('MetaData "ReliableMsg" "Hydra.Network.Reliability" "hydra-node-0.20.0-36eQlJ1pRR653kUqePgM5r" 'False) (C1 ('MetaCons "ReliableMsg" 'PrefixI 'True) (S1 ('MetaSel ('Just "knownMessageIds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int)) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 msg)))

data ReliabilityLog Source #

Log entries specific to this network layer.

NOTE: Log items are documented in a YAML schema file which is not currently public, but should be.

Instances

Instances details
Arbitrary ReliabilityLog Source # 
Instance details

Defined in Hydra.Network.Reliability

ToJSON ReliabilityLog Source # 
Instance details

Defined in Hydra.Network.Reliability

Generic ReliabilityLog Source # 
Instance details

Defined in Hydra.Network.Reliability

Associated Types

type Rep ReliabilityLog :: Type -> Type Source #

Show ReliabilityLog Source # 
Instance details

Defined in Hydra.Network.Reliability

Eq ReliabilityLog Source # 
Instance details

Defined in Hydra.Network.Reliability

type Rep ReliabilityLog Source # 
Instance details

Defined in Hydra.Network.Reliability

type Rep ReliabilityLog = D1 ('MetaData "ReliabilityLog" "Hydra.Network.Reliability" "hydra-node-0.20.0-36eQlJ1pRR653kUqePgM5r" 'False) (((C1 ('MetaCons "Resending" 'PrefixI 'True) ((S1 ('MetaSel ('Just "missing") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int)) :*: S1 ('MetaSel ('Just "acknowledged") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int))) :*: (S1 ('MetaSel ('Just "localCounter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int)) :*: S1 ('MetaSel ('Just "theirIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int))) :+: C1 ('MetaCons "BroadcastCounter" 'PrefixI 'True) (S1 ('MetaSel ('Just "ourIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int) :*: S1 ('MetaSel ('Just "localCounter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int)))) :+: (C1 ('MetaCons "BroadcastPing" 'PrefixI 'True) (S1 ('MetaSel ('Just "ourIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int) :*: S1 ('MetaSel ('Just "localCounter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int))) :+: C1 ('MetaCons "Received" 'PrefixI 'True) ((S1 ('MetaSel ('Just "acknowledged") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int)) :*: S1 ('MetaSel ('Just "localCounter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int))) :*: (S1 ('MetaSel ('Just "theirIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int) :*: S1 ('MetaSel ('Just "ourIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int))))) :+: ((C1 ('MetaCons "Ignored" 'PrefixI 'True) ((S1 ('MetaSel ('Just "acknowledged") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int)) :*: S1 ('MetaSel ('Just "localCounter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int))) :*: (S1 ('MetaSel ('Just "theirIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int) :*: S1 ('MetaSel ('Just "ourIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int))) :+: C1 ('MetaCons "ReliabilityFailedToFindMsg" 'PrefixI 'True) ((S1 ('MetaSel ('Just "missingMsgIndex") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int) :*: S1 ('MetaSel ('Just "sentMessagesLength") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)) :*: (S1 ('MetaSel ('Just "knownAckForUs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int) :*: S1 ('MetaSel ('Just "messageAckForUs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int)))) :+: (C1 ('MetaCons "ReliabilityMissingPartyIndex" 'PrefixI 'True) (S1 ('MetaSel ('Just "missingParty") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Party)) :+: C1 ('MetaCons "ReceivedMalformedAcks" 'PrefixI 'True) (S1 ('MetaSel ('Just "fromParty") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Party) :*: (S1 ('MetaSel ('Just "partyAcks") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Vector Int)) :*: S1 ('MetaSel ('Just "numberOfParties") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int))))))

data MessagePersistence m msg Source #

Handle for all persistence operations in the Reliability network layer. This handle takes care of storing and retreiving vector clock and all messages.

Constructors

MessagePersistence 

Fields

mkMessagePersistence :: (MonadThrow m, FromJSON msg, ToJSON msg) => Int -> PersistenceIncremental (Heartbeat msg) m -> Persistence (Vector Int) m -> MessagePersistence m msg Source #

Create MessagePersistence out of PersistenceIncremental and Persistence handles. This handle loads and saves acks (vector clock data) and can load and append network messages. On start we construct empty ack vector from all parties in case nothing is stored on disk. NOTE: This handle is returned in the underlying context just for the sake of convenience.

withReliability Source #

Arguments

:: (MonadThrow (STM m), MonadThrow m, MonadAsync m) 
=> Tracer m ReliabilityLog

Tracer for logging messages.

-> MessagePersistence m outbound

Our persistence handle

-> Party

Our own party identifier.

-> [Party]

Other parties' identifiers.

-> NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a

Underlying network component providing consuming and sending channels.

-> NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a 

Middleware function to handle message counters tracking and resending logic.

''NOTE'': There is some "abstraction leak" here, because the withReliability layer is tied to a specific structure of other layers, eg. be between withHeartbeat and withAuthenticate layers.

NOTE: Make better use of Vectors? We should perhaps use a MVector to be able to mutate in-place and not need zipWith