Safe Haskell | Safe-Inferred |
---|---|
Language | GHC2021 |
A Network
layer that provides resilience in the face of network
connectivity loss and (most) crashes.
This network layer takes care of 2 aspects that together improve the reliability and operability of a Hydra cluster, making it more tolerant to transient failures and therefore alleviating the need to prematurely close heads:
- To safeguard against lost of connectivity, it keeps track of messages indices and resend lost messages,
- To safeguard against crashes, it persists all sent messages and indices on disk.
Messages ordering & resending
This layer implements an algorithm based on vector clocks, loosely inspired from Reliable consistent broadcast algorithms with FIFO ordering as presented in Introduction to Reliable and Secure Distributed Programming, ch. 3.9, by Cachin et al.
Each node maintains a vector of monotonically increasing integer
indices denoting the index of the last message known (sent or received) for
each peer, where a peer is identified a Party
, which is updated upon
sending and receiving messages, and is sent with each message.
The basic algorithm is simple:
- When a message is sent, the index of the current node's party is incremented,
When a message is received:
- It is discarded if the index for the sender's party in the message is not exactly one more than the latest known index,
- If our own party's index, as broadcasted by the sender, is lower than our latest known index, and the peer appears quiescent we resend all the "missing" messages.
As shown by the signature of the withReliability
function, this layer
relies on an authentication layer providing Authenticated
messages in order
to securely identify senders, and also on Heartbeat
messages in order to
provide some "liveness".
Heartbeat
messages are critical in order to signal peers our current view
of the world, because it could be the case that we don't have any network
message to send which leads to head being stalled. Ping
messages in
particular are used to denote the fact the sender is quiescent, ie. it's
not able to make any observable progress. Those messages are treated
specially, both when receiving and sending:
- When sending a
Ping
, we don't increment our local message counter before broadcasting it, - Conversely, when receiving a
Ping
, we don't update the peer's message counter but only take into account their view of our counter in order to compute whether or not to resend previous messages.
Messages persistence
The MessagePersistence
handle defines an interface that's used to load and
persist both the sequence of messages $sel:broadcast:Network
ed, and the vector clock
representing our current view of the all peers' knowledge about
messages. Because it's hard to guarantee atomicity of IO operations on files,
we only save the indices when we broadcast and use a local cache in the
NetworkCallback
: The underlying layers can callback us concurrently,
eg. when each connection to peers is managed by a dedicated thread.
NOTE: We do not recover (at least) from one particular crash situation: When
the withReliability
"delivers" a message to the calling code, it's usually
put in a queue wrapped into a NetworkEvent
and then handled later
on. Should the node crashes at that particular point, it won't be resubmitted
the same message later because the message indices could have been updated
and written to on disk already.
NOTE: Messages sent are currently kept indefinitely on disk because we don't set any bound to how far from the past a peer would need to be resent messages. In the case of very long-running head with a high frequency of transaction submission, can lead to significant storage use. Should this become a problem, this can be mitigated by closing and reopening a head.
Synopsis
- data ReliableMsg msg = ReliableMsg {
- knownMessageIds :: Vector Int
- payload :: msg
- data ReliabilityLog
- = Resending {
- missing :: Vector Int
- acknowledged :: Vector Int
- localCounter :: Vector Int
- theirIndex :: Int
- | BroadcastCounter {
- ourIndex :: Int
- localCounter :: Vector Int
- | BroadcastPing {
- ourIndex :: Int
- localCounter :: Vector Int
- | Received {
- acknowledged :: Vector Int
- localCounter :: Vector Int
- theirIndex :: Int
- ourIndex :: Int
- | Ignored {
- acknowledged :: Vector Int
- localCounter :: Vector Int
- theirIndex :: Int
- ourIndex :: Int
- | ReliabilityFailedToFindMsg { }
- | ReliabilityMissingPartyIndex {
- missingParty :: Party
- | ReceivedMalformedAcks {
- fromParty :: Party
- partyAcks :: Vector Int
- numberOfParties :: Int
- = Resending {
- data MessagePersistence m msg = MessagePersistence {
- loadAcks :: m (Vector Int)
- saveAcks :: Vector Int -> m ()
- loadMessages :: m [Heartbeat msg]
- appendMessage :: Heartbeat msg -> m ()
- mkMessagePersistence :: (MonadThrow m, FromJSON msg, ToJSON msg) => Int -> PersistenceIncremental (Heartbeat msg) m -> Persistence (Vector Int) m -> MessagePersistence m msg
- withReliability :: (MonadThrow (STM m), MonadThrow m, MonadAsync m) => Tracer m ReliabilityLog -> MessagePersistence m outbound -> Party -> [Party] -> NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a -> NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
Documentation
data ReliableMsg msg Source #
ReliableMsg | |
|
Instances
data ReliabilityLog Source #
Log entries specific to this network layer.
NOTE: Log items are documented in a YAML schema file which is not currently public, but should be.
Resending | |
| |
BroadcastCounter | |
| |
BroadcastPing | |
| |
Received | |
| |
Ignored | |
| |
ReliabilityFailedToFindMsg | |
| |
ReliabilityMissingPartyIndex | |
| |
ReceivedMalformedAcks | |
|
Instances
data MessagePersistence m msg Source #
Handle for all persistence operations in the Reliability network layer. This handle takes care of storing and retreiving vector clock and all messages.
MessagePersistence | |
|
mkMessagePersistence :: (MonadThrow m, FromJSON msg, ToJSON msg) => Int -> PersistenceIncremental (Heartbeat msg) m -> Persistence (Vector Int) m -> MessagePersistence m msg Source #
Create MessagePersistence
out of PersistenceIncremental
and
Persistence
handles. This handle loads and saves acks (vector clock data)
and can load and append network messages.
On start we construct empty ack vector from all parties in case nothing
is stored on disk.
NOTE: This handle is returned in the underlying context just for the sake of
convenience.
:: (MonadThrow (STM m), MonadThrow m, MonadAsync m) | |
=> Tracer m ReliabilityLog | Tracer for logging messages. |
-> MessagePersistence m outbound | Our persistence handle |
-> Party | Our own party identifier. |
-> [Party] | Other parties' identifiers. |
-> NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a | Underlying network component providing consuming and sending channels. |
-> NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a |
Middleware function to handle message counters tracking and resending logic.
''NOTE''
: There is some "abstraction leak" here, because the withReliability
layer is tied to a specific structure of other layers, eg. be between
withHeartbeat
and withAuthenticate
layers.
NOTE: Make better use of Vectors? We should perhaps use a MVector
to be able to
mutate in-place and not need zipWith