{-# OPTIONS_GHC -Wno-orphans #-}

-- | A `Network` layer that provides resilience in the face of network
-- connectivity loss and (most) crashes.
--
-- This network layer takes care of 2 aspects that together improve the
-- reliability and operability of a Hydra cluster, making it more tolerant to
-- transient failures and therefore alleviating the need to prematurely close
-- heads:
--
-- 1. To safeguard against lost of connectivity, it keeps track of messages
-- indices and resend lost messages,
-- 2. To safeguard against crashes, it persists all sent messages and indices on disk.
--
-- == Messages ordering & resending
--
-- This layer implements an algorithm based on /vector clocks/, loosely inspired from
-- /Reliable consistent broadcast/ algorithms with FIFO ordering as presented in
-- [Introduction to Reliable and Secure Distributed
-- Programming](https://www.distributedprogramming.net), ch. 3.9, by Cachin et al.
--
-- Each node maintains a vector of monotonically increasing integer
-- indices denoting the index of the last message known (sent or received) for
-- each peer, where a peer is identified a `Party`, which is updated upon
-- sending and receiving messages, and is sent with each message.
--
-- The basic algorithm is simple:
--
--   * When a message is sent, the index of the current node's party is incremented,
--
--   * When a message is received:
--
--       * It is discarded if the index for the sender's party in the message is
--         not exactly one more than the latest known index,
--
--       * If our own party's index, as broadcasted by the sender, is lower than
--         our latest known index, and the peer appears /quiescent/ we resend
--         all the "missing" messages.
--
-- As shown by the signature of the `withReliability` function, this layer
-- relies on an authentication layer providing `Authenticated` messages in order
-- to securely identify senders, and also on `Heartbeat` messages in order to
-- provide some "liveness".
--
-- `Heartbeat` messages are critical in order to /signal/ peers our current view
-- of the world, because it could be the case that we don't have any network
-- message to send which leads to head being stalled. `Ping` messages in
-- particular are used to denote the fact the sender is /quiescent/, ie. it's
-- not able to make any /observable/ progress. Those messages are treated
-- specially, both when receiving and sending:
--
--   * When sending a `Ping`, we /don't increment/ our local message counter
--     before broadcasting it,
--
--   * Conversely, when receiving a `Ping`, we don't update the peer's message
--     counter but only take into account their view of /our/ counter in order
--     to compute whether or not to resend previous messages.
--
-- == Messages persistence
--
-- The `MessagePersistence` handle defines an interface that's used to load and
-- persist both the sequence of messages `broadcast`ed, and the /vector clock/
-- representing our current view of the all peers' knowledge about
-- messages. Because it's hard to guarantee atomicity of IO operations on files,
-- we only save the indices when we /broadcast/ and use a local cache in the
-- `NetworkCallback`: The underlying layers can callback us concurrently,
-- eg. when each connection to peers is managed by a dedicated thread.
--
-- __NOTE__: We do not recover (at least) from one particular crash situation: When
-- the `withReliability` "delivers" a message to the calling code, it's usually
-- put in a queue wrapped into a `NetworkEvent` and then handled later
-- on. Should the node crashes at that particular point, it won't be resubmitted
-- the same message later because the message indices could have been updated
-- and written to on disk already.
--
-- __NOTE__: Messages sent are /currently/ kept indefinitely on disk because we
-- don't set any bound to how far from the past a peer would need to be resent
-- messages. In the case of very long-running head with a high frequency of
-- transaction submission, can lead to significant storage use. Should this
-- become a problem, this can be mitigated by closing and reopening a head.
module Hydra.Network.Reliability where

import Hydra.Prelude hiding (empty, fromList, length, replicate, zipWith)

import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (
  MonadSTM (readTQueue, writeTQueue),
  modifyTVar',
  newTQueueIO,
  newTVarIO,
  readTVarIO,
  writeTVar,
 )
import Control.Tracer (Tracer)
import Data.IntMap qualified as IMap
import Data.Sequence.Strict ((|>))
import Data.Sequence.Strict qualified as Seq
import Data.Vector (
  Vector,
  elemIndex,
  fromList,
  generate,
  length,
  replicate,
  zipWith,
  (!?),
 )
import Hydra.Logging (traceWith)
import Hydra.Network (Network (..), NetworkComponent)
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), isPing)
import Hydra.Party (Party)
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..))
import Test.QuickCheck (getPositive, listOf)

data ReliableMsg msg = ReliableMsg
  { forall msg. ReliableMsg msg -> Vector Int
knownMessageIds :: Vector Int
  -- ^ Vector of highest known counter for each known party. Serves as announcement of
  -- which messages the sender of `ReliableMsg` has seen. The individual counters have
  -- nothing to do with the `message` also included in this.
  , forall msg. ReliableMsg msg -> msg
payload :: msg
  }
  deriving stock (ReliableMsg msg -> ReliableMsg msg -> Bool
(ReliableMsg msg -> ReliableMsg msg -> Bool)
-> (ReliableMsg msg -> ReliableMsg msg -> Bool)
-> Eq (ReliableMsg msg)
forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
== :: ReliableMsg msg -> ReliableMsg msg -> Bool
$c/= :: forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
/= :: ReliableMsg msg -> ReliableMsg msg -> Bool
Eq, Int -> ReliableMsg msg -> ShowS
[ReliableMsg msg] -> ShowS
ReliableMsg msg -> String
(Int -> ReliableMsg msg -> ShowS)
-> (ReliableMsg msg -> String)
-> ([ReliableMsg msg] -> ShowS)
-> Show (ReliableMsg msg)
forall msg. Show msg => Int -> ReliableMsg msg -> ShowS
forall msg. Show msg => [ReliableMsg msg] -> ShowS
forall msg. Show msg => ReliableMsg msg -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall msg. Show msg => Int -> ReliableMsg msg -> ShowS
showsPrec :: Int -> ReliableMsg msg -> ShowS
$cshow :: forall msg. Show msg => ReliableMsg msg -> String
show :: ReliableMsg msg -> String
$cshowList :: forall msg. Show msg => [ReliableMsg msg] -> ShowS
showList :: [ReliableMsg msg] -> ShowS
Show, (forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x)
-> (forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg)
-> Generic (ReliableMsg msg)
forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg
forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall msg x. Rep (ReliableMsg msg) x -> ReliableMsg msg
forall msg x. ReliableMsg msg -> Rep (ReliableMsg msg) x
$cfrom :: forall msg x. ReliableMsg msg -> Rep (ReliableMsg msg) x
from :: forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x
$cto :: forall msg x. Rep (ReliableMsg msg) x -> ReliableMsg msg
to :: forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg
Generic)
  deriving anyclass ([ReliableMsg msg] -> Value
[ReliableMsg msg] -> Encoding
ReliableMsg msg -> Bool
ReliableMsg msg -> Value
ReliableMsg msg -> Encoding
(ReliableMsg msg -> Value)
-> (ReliableMsg msg -> Encoding)
-> ([ReliableMsg msg] -> Value)
-> ([ReliableMsg msg] -> Encoding)
-> (ReliableMsg msg -> Bool)
-> ToJSON (ReliableMsg msg)
forall msg. ToJSON msg => [ReliableMsg msg] -> Value
forall msg. ToJSON msg => [ReliableMsg msg] -> Encoding
forall msg. ToJSON msg => ReliableMsg msg -> Bool
forall msg. ToJSON msg => ReliableMsg msg -> Value
forall msg. ToJSON msg => ReliableMsg msg -> Encoding
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: forall msg. ToJSON msg => ReliableMsg msg -> Value
toJSON :: ReliableMsg msg -> Value
$ctoEncoding :: forall msg. ToJSON msg => ReliableMsg msg -> Encoding
toEncoding :: ReliableMsg msg -> Encoding
$ctoJSONList :: forall msg. ToJSON msg => [ReliableMsg msg] -> Value
toJSONList :: [ReliableMsg msg] -> Value
$ctoEncodingList :: forall msg. ToJSON msg => [ReliableMsg msg] -> Encoding
toEncodingList :: [ReliableMsg msg] -> Encoding
$comitField :: forall msg. ToJSON msg => ReliableMsg msg -> Bool
omitField :: ReliableMsg msg -> Bool
ToJSON, Maybe (ReliableMsg msg)
Value -> Parser [ReliableMsg msg]
Value -> Parser (ReliableMsg msg)
(Value -> Parser (ReliableMsg msg))
-> (Value -> Parser [ReliableMsg msg])
-> Maybe (ReliableMsg msg)
-> FromJSON (ReliableMsg msg)
forall msg. FromJSON msg => Maybe (ReliableMsg msg)
forall msg. FromJSON msg => Value -> Parser [ReliableMsg msg]
forall msg. FromJSON msg => Value -> Parser (ReliableMsg msg)
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: forall msg. FromJSON msg => Value -> Parser (ReliableMsg msg)
parseJSON :: Value -> Parser (ReliableMsg msg)
$cparseJSONList :: forall msg. FromJSON msg => Value -> Parser [ReliableMsg msg]
parseJSONList :: Value -> Parser [ReliableMsg msg]
$comittedField :: forall msg. FromJSON msg => Maybe (ReliableMsg msg)
omittedField :: Maybe (ReliableMsg msg)
FromJSON)

instance ToCBOR msg => ToCBOR (ReliableMsg msg) where
  toCBOR :: ReliableMsg msg -> Encoding
toCBOR ReliableMsg{Vector Int
$sel:knownMessageIds:ReliableMsg :: forall msg. ReliableMsg msg -> Vector Int
knownMessageIds :: Vector Int
knownMessageIds, msg
$sel:payload:ReliableMsg :: forall msg. ReliableMsg msg -> msg
payload :: msg
payload} = Vector Int -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR Vector Int
knownMessageIds Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> msg -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR msg
payload

instance FromCBOR msg => FromCBOR (ReliableMsg msg) where
  fromCBOR :: forall s. Decoder s (ReliableMsg msg)
fromCBOR = Vector Int -> msg -> ReliableMsg msg
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg (Vector Int -> msg -> ReliableMsg msg)
-> Decoder s (Vector Int) -> Decoder s (msg -> ReliableMsg msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Decoder s (Vector Int)
forall s. Decoder s (Vector Int)
forall a s. FromCBOR a => Decoder s a
fromCBOR Decoder s (msg -> ReliableMsg msg)
-> Decoder s msg -> Decoder s (ReliableMsg msg)
forall a b. Decoder s (a -> b) -> Decoder s a -> Decoder s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Decoder s msg
forall s. Decoder s msg
forall a s. FromCBOR a => Decoder s a
fromCBOR

instance ToCBOR msg => SignableRepresentation (ReliableMsg msg) where
  getSignableRepresentation :: ReliableMsg msg -> ByteString
getSignableRepresentation = ReliableMsg msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize'

-- | Log entries specific to this network layer.
--
-- __NOTE__: Log items are documented in a YAML schema file which is not
-- currently public, but should be.
data ReliabilityLog
  = Resending {ReliabilityLog -> Vector Int
missing :: Vector Int, ReliabilityLog -> Vector Int
acknowledged :: Vector Int, ReliabilityLog -> Vector Int
localCounter :: Vector Int, ReliabilityLog -> Int
theirIndex :: Int}
  | BroadcastCounter {ReliabilityLog -> Int
ourIndex :: Int, localCounter :: Vector Int}
  | BroadcastPing {ourIndex :: Int, localCounter :: Vector Int}
  | Received {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int}
  | Ignored {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int}
  | ReliabilityFailedToFindMsg
      { ReliabilityLog -> Int
missingMsgIndex :: Int
      , ReliabilityLog -> Int
sentMessagesLength :: Int
      , ReliabilityLog -> Int
knownAckForUs :: Int
      , ReliabilityLog -> Int
messageAckForUs :: Int
      }
  | ReliabilityMissingPartyIndex {ReliabilityLog -> Party
missingParty :: Party}
  | ReceivedMalformedAcks
      { ReliabilityLog -> Party
fromParty :: Party
      , ReliabilityLog -> Vector Int
partyAcks :: Vector Int
      , ReliabilityLog -> Int
numberOfParties :: Int
      }
  deriving stock (Int -> ReliabilityLog -> ShowS
[ReliabilityLog] -> ShowS
ReliabilityLog -> String
(Int -> ReliabilityLog -> ShowS)
-> (ReliabilityLog -> String)
-> ([ReliabilityLog] -> ShowS)
-> Show ReliabilityLog
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReliabilityLog -> ShowS
showsPrec :: Int -> ReliabilityLog -> ShowS
$cshow :: ReliabilityLog -> String
show :: ReliabilityLog -> String
$cshowList :: [ReliabilityLog] -> ShowS
showList :: [ReliabilityLog] -> ShowS
Show, ReliabilityLog -> ReliabilityLog -> Bool
(ReliabilityLog -> ReliabilityLog -> Bool)
-> (ReliabilityLog -> ReliabilityLog -> Bool) -> Eq ReliabilityLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReliabilityLog -> ReliabilityLog -> Bool
== :: ReliabilityLog -> ReliabilityLog -> Bool
$c/= :: ReliabilityLog -> ReliabilityLog -> Bool
/= :: ReliabilityLog -> ReliabilityLog -> Bool
Eq, (forall x. ReliabilityLog -> Rep ReliabilityLog x)
-> (forall x. Rep ReliabilityLog x -> ReliabilityLog)
-> Generic ReliabilityLog
forall x. Rep ReliabilityLog x -> ReliabilityLog
forall x. ReliabilityLog -> Rep ReliabilityLog x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ReliabilityLog -> Rep ReliabilityLog x
from :: forall x. ReliabilityLog -> Rep ReliabilityLog x
$cto :: forall x. Rep ReliabilityLog x -> ReliabilityLog
to :: forall x. Rep ReliabilityLog x -> ReliabilityLog
Generic)
  deriving anyclass ([ReliabilityLog] -> Value
[ReliabilityLog] -> Encoding
ReliabilityLog -> Bool
ReliabilityLog -> Value
ReliabilityLog -> Encoding
(ReliabilityLog -> Value)
-> (ReliabilityLog -> Encoding)
-> ([ReliabilityLog] -> Value)
-> ([ReliabilityLog] -> Encoding)
-> (ReliabilityLog -> Bool)
-> ToJSON ReliabilityLog
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: ReliabilityLog -> Value
toJSON :: ReliabilityLog -> Value
$ctoEncoding :: ReliabilityLog -> Encoding
toEncoding :: ReliabilityLog -> Encoding
$ctoJSONList :: [ReliabilityLog] -> Value
toJSONList :: [ReliabilityLog] -> Value
$ctoEncodingList :: [ReliabilityLog] -> Encoding
toEncodingList :: [ReliabilityLog] -> Encoding
$comitField :: ReliabilityLog -> Bool
omitField :: ReliabilityLog -> Bool
ToJSON, Maybe ReliabilityLog
Value -> Parser [ReliabilityLog]
Value -> Parser ReliabilityLog
(Value -> Parser ReliabilityLog)
-> (Value -> Parser [ReliabilityLog])
-> Maybe ReliabilityLog
-> FromJSON ReliabilityLog
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser ReliabilityLog
parseJSON :: Value -> Parser ReliabilityLog
$cparseJSONList :: Value -> Parser [ReliabilityLog]
parseJSONList :: Value -> Parser [ReliabilityLog]
$comittedField :: Maybe ReliabilityLog
omittedField :: Maybe ReliabilityLog
FromJSON)

instance Arbitrary (Vector Int) where
  arbitrary :: Gen (Vector Int)
arbitrary = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList ([Int] -> Vector Int) -> Gen [Int] -> Gen (Vector Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen Int -> Gen [Int]
forall a. Gen a -> Gen [a]
listOf (Positive Int -> Int
forall a. Positive a -> a
getPositive (Positive Int -> Int) -> Gen (Positive Int) -> Gen Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen (Positive Int)
forall a. Arbitrary a => Gen a
arbitrary)
  shrink :: Vector Int -> [Vector Int]
shrink Vector Int
v = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList ([Int] -> Vector Int) -> [[Int]] -> [Vector Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int] -> [[Int]]
forall a. Arbitrary a => a -> [a]
shrink (Vector Int -> [Int]
forall a. Vector a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Vector Int
v)

instance Arbitrary ReliabilityLog where
  arbitrary :: Gen ReliabilityLog
arbitrary = Gen ReliabilityLog
forall a.
(Generic a, GA UnsizedOpts (Rep a),
 UniformWeight (Weights_ (Rep a))) =>
Gen a
genericArbitrary
  shrink :: ReliabilityLog -> [ReliabilityLog]
shrink = ReliabilityLog -> [ReliabilityLog]
forall a.
(Generic a, RecursivelyShrink (Rep a), GSubterms (Rep a) a) =>
a -> [a]
genericShrink

-- | Handle for all persistence operations in the Reliability network layer.
-- This handle takes care of storing and retreiving vector clock and all
-- messages.
data MessagePersistence m msg = MessagePersistence
  { forall (m :: * -> *) msg.
MessagePersistence m msg -> m (Vector Int)
loadAcks :: m (Vector Int)
  , forall (m :: * -> *) msg.
MessagePersistence m msg -> Vector Int -> m ()
saveAcks :: Vector Int -> m ()
  , forall (m :: * -> *) msg.
MessagePersistence m msg -> m [Heartbeat msg]
loadMessages :: m [Heartbeat msg]
  , forall (m :: * -> *) msg.
MessagePersistence m msg -> Heartbeat msg -> m ()
appendMessage :: Heartbeat msg -> m ()
  }

-- | Create 'MessagePersistence' out of 'PersistenceIncremental' and
-- 'Persistence' handles. This handle loads and saves acks (vector clock data)
-- and can load and append network messages.
-- On start we construct empty ack vector from all parties in case nothing
-- is stored on disk.
-- NOTE: This handle is returned in the underlying context just for the sake of
-- convenience.
mkMessagePersistence ::
  (MonadThrow m, FromJSON msg, ToJSON msg) =>
  Int ->
  PersistenceIncremental (Heartbeat msg) m ->
  Persistence (Vector Int) m ->
  MessagePersistence m msg
mkMessagePersistence :: forall (m :: * -> *) msg.
(MonadThrow m, FromJSON msg, ToJSON msg) =>
Int
-> PersistenceIncremental (Heartbeat msg) m
-> Persistence (Vector Int) m
-> MessagePersistence m msg
mkMessagePersistence Int
numberOfParties PersistenceIncremental (Heartbeat msg) m
msgPersistence Persistence (Vector Int) m
ackPersistence =
  MessagePersistence
    { $sel:loadAcks:MessagePersistence :: m (Vector Int)
loadAcks = do
        Maybe (Vector Int)
macks <- Persistence (Vector Int) m
-> FromJSON (Vector Int) => m (Maybe (Vector Int))
forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load Persistence (Vector Int) m
ackPersistence
        case Maybe (Vector Int)
macks of
          Maybe (Vector Int)
Nothing -> Vector Int -> m (Vector Int)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Vector Int -> m (Vector Int)) -> Vector Int -> m (Vector Int)
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Int
forall a. Int -> a -> Vector a
replicate Int
numberOfParties Int
0
          Just Vector Int
acks -> Vector Int -> m (Vector Int)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
acks
    , $sel:saveAcks:MessagePersistence :: Vector Int -> m ()
saveAcks = \Vector Int
acks -> do
        Persistence (Vector Int) m
-> ToJSON (Vector Int) => Vector Int -> m ()
forall a (m :: * -> *). Persistence a m -> ToJSON a => a -> m ()
save Persistence (Vector Int) m
ackPersistence Vector Int
acks
    , $sel:loadMessages:MessagePersistence :: m [Heartbeat msg]
loadMessages = do
        PersistenceIncremental (Heartbeat msg) m
-> FromJSON (Heartbeat msg) => m [Heartbeat msg]
forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll PersistenceIncremental (Heartbeat msg) m
msgPersistence
    , $sel:appendMessage:MessagePersistence :: Heartbeat msg -> m ()
appendMessage = \Heartbeat msg
msg -> do
        PersistenceIncremental (Heartbeat msg) m
-> ToJSON (Heartbeat msg) => Heartbeat msg -> m ()
forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append PersistenceIncremental (Heartbeat msg) m
msgPersistence Heartbeat msg
msg
    }

-- | Middleware function to handle message counters tracking and resending logic.
--
-- '''NOTE''': There is some "abstraction leak" here, because the `withReliability`
-- layer is tied to a specific structure of other layers, eg. be between
-- `withHeartbeat` and `withAuthenticate` layers.
--
-- NOTE: Make better use of Vectors? We should perhaps use a `MVector` to be able to
-- mutate in-place and not need `zipWith`
withReliability ::
  (MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
  -- | Tracer for logging messages.
  Tracer m ReliabilityLog ->
  -- | Our persistence handle
  MessagePersistence m msg ->
  -- | Our own party identifier.
  Party ->
  -- | Other parties' identifiers.
  [Party] ->
  -- | Underlying network component providing consuming and sending channels.
  NetworkComponent m (Authenticated (ReliableMsg (Heartbeat msg))) (ReliableMsg (Heartbeat msg)) a ->
  NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability :: forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability Tracer m ReliabilityLog
tracer MessagePersistence{Vector Int -> m ()
$sel:saveAcks:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> Vector Int -> m ()
saveAcks :: Vector Int -> m ()
saveAcks, m (Vector Int)
$sel:loadAcks:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> m (Vector Int)
loadAcks :: m (Vector Int)
loadAcks, Heartbeat msg -> m ()
$sel:appendMessage:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> Heartbeat msg -> m ()
appendMessage :: Heartbeat msg -> m ()
appendMessage, m [Heartbeat msg]
$sel:loadMessages:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> m [Heartbeat msg]
loadMessages :: m [Heartbeat msg]
loadMessages} Party
me [Party]
otherParties NetworkComponent
  m
  (Authenticated (ReliableMsg (Heartbeat msg)))
  (ReliableMsg (Heartbeat msg))
  a
withRawNetwork NetworkCallback (Authenticated (Heartbeat msg)) m
callback Network m (Heartbeat msg) -> m a
action = do
  TVar m (Vector Int)
acksCache <- m (Vector Int)
loadAcks m (Vector Int)
-> (Vector Int -> m (TVar m (Vector Int)))
-> m (TVar m (Vector Int))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Vector Int -> m (TVar m (Vector Int))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO
  TVar m (StrictSeq (Heartbeat msg))
sentMessages <- m [Heartbeat msg]
loadMessages m [Heartbeat msg]
-> ([Heartbeat msg] -> m (TVar m (StrictSeq (Heartbeat msg))))
-> m (TVar m (StrictSeq (Heartbeat msg)))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StrictSeq (Heartbeat msg) -> m (TVar m (StrictSeq (Heartbeat msg)))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (StrictSeq (Heartbeat msg)
 -> m (TVar m (StrictSeq (Heartbeat msg))))
-> ([Heartbeat msg] -> StrictSeq (Heartbeat msg))
-> [Heartbeat msg]
-> m (TVar m (StrictSeq (Heartbeat msg)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Heartbeat msg] -> StrictSeq (Heartbeat msg)
forall a. [a] -> StrictSeq a
Seq.fromList
  TQueue m (ReliableMsg (Heartbeat msg))
resendQ <- m (TQueue m (ReliableMsg (Heartbeat msg)))
forall a. m (TQueue m a)
forall (m :: * -> *) a. MonadSTM m => m (TQueue m a)
newTQueueIO
  let ourIndex :: Int
ourIndex = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (Text -> Int
forall a t. (HasCallStack, IsText t) => t -> a
error Text
"This cannot happen because we constructed the list with our party inside.") (Party -> Maybe Int
findPartyIndex Party
me)
  let resend :: ReliableMsg (Heartbeat msg) -> STM m ()
resend = TQueue m (ReliableMsg (Heartbeat msg))
-> ReliableMsg (Heartbeat msg) -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (ReliableMsg (Heartbeat msg))
resendQ
  NetworkComponent
  m
  (Authenticated (ReliableMsg (Heartbeat msg)))
  (ReliableMsg (Heartbeat msg))
  a
withRawNetwork (TVar m (Vector Int)
-> TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Authenticated (ReliableMsg (Heartbeat msg))
-> m ()
reliableCallback TVar m (Vector Int)
acksCache TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
ourIndex) ((Network m (ReliableMsg (Heartbeat msg)) -> m a) -> m a)
-> (Network m (ReliableMsg (Heartbeat msg)) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \network :: Network m (ReliableMsg (Heartbeat msg))
network@Network{ReliableMsg (Heartbeat msg) -> m ()
broadcast :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: ReliableMsg (Heartbeat msg) -> m ()
broadcast} -> do
    m Any -> (Async m Any -> m a) -> m a
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ STM m (ReliableMsg (Heartbeat msg))
-> m (ReliableMsg (Heartbeat msg))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (ReliableMsg (Heartbeat msg))
-> STM m (ReliableMsg (Heartbeat msg))
forall a. TQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m (ReliableMsg (Heartbeat msg))
resendQ) m (ReliableMsg (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReliableMsg (Heartbeat msg) -> m ()
broadcast) ((Async m Any -> m a) -> m a) -> (Async m Any -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m Any
_ ->
      TVar m (StrictSeq (Heartbeat msg))
-> Int
-> TVar m (Vector Int)
-> Network m (ReliableMsg (Heartbeat msg))
-> m a
reliableBroadcast TVar m (StrictSeq (Heartbeat msg))
sentMessages Int
ourIndex TVar m (Vector Int)
acksCache Network m (ReliableMsg (Heartbeat msg))
network
 where
  allParties :: Vector Party
allParties = [Party] -> Vector Party
forall a. [a] -> Vector a
fromList ([Party] -> Vector Party) -> [Party] -> Vector Party
forall a b. (a -> b) -> a -> b
$ [Party] -> [Party]
forall a. Ord a => [a] -> [a]
sort ([Party] -> [Party]) -> [Party] -> [Party]
forall a b. (a -> b) -> a -> b
$ Party
me Party -> [Party] -> [Party]
forall a. a -> [a] -> [a]
: [Party]
otherParties
  reliableBroadcast :: TVar m (StrictSeq (Heartbeat msg))
-> Int
-> TVar m (Vector Int)
-> Network m (ReliableMsg (Heartbeat msg))
-> m a
reliableBroadcast TVar m (StrictSeq (Heartbeat msg))
sentMessages Int
ourIndex TVar m (Vector Int)
acksCache Network{ReliableMsg (Heartbeat msg) -> m ()
broadcast :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: ReliableMsg (Heartbeat msg) -> m ()
broadcast} =
    Network m (Heartbeat msg) -> m a
action (Network m (Heartbeat msg) -> m a)
-> Network m (Heartbeat msg) -> m a
forall a b. (a -> b) -> a -> b
$
      Network
        { broadcast :: Heartbeat msg -> m ()
broadcast = \Heartbeat msg
msg ->
            case Heartbeat msg
msg of
              Data{} -> do
                Vector Int
localCounter <- STM m (Vector Int) -> m (Vector Int)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Vector Int) -> m (Vector Int))
-> STM m (Vector Int) -> m (Vector Int)
forall a b. (a -> b) -> a -> b
$ Heartbeat msg -> STM m ()
cacheMessage Heartbeat msg
msg STM m () -> STM m (Vector Int) -> STM m (Vector Int)
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM m (Vector Int)
incrementAckCounter
                Vector Int -> m ()
saveAcks Vector Int
localCounter
                Heartbeat msg -> m ()
appendMessage Heartbeat msg
msg
                Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer BroadcastCounter{Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter}
                ReliableMsg (Heartbeat msg) -> m ()
broadcast (ReliableMsg (Heartbeat msg) -> m ())
-> ReliableMsg (Heartbeat msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Vector Int -> Heartbeat msg -> ReliableMsg (Heartbeat msg)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat msg
msg
              Ping{} -> do
                Vector Int
localCounter <- TVar m (Vector Int) -> m (Vector Int)
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (Vector Int)
acksCache
                Vector Int -> m ()
saveAcks Vector Int
localCounter
                Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer BroadcastPing{Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter}
                ReliableMsg (Heartbeat msg) -> m ()
broadcast (ReliableMsg (Heartbeat msg) -> m ())
-> ReliableMsg (Heartbeat msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Vector Int -> Heartbeat msg -> ReliableMsg (Heartbeat msg)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat msg
msg
        }
   where
    incrementAckCounter :: STM m (Vector Int)
incrementAckCounter = do
      Vector Int
acks <- TVar m (Vector Int) -> STM m (Vector Int)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector Int)
acksCache
      let newAcks :: Vector Int
newAcks = Vector Int -> Int -> Vector Int
constructAcks Vector Int
acks Int
ourIndex
      TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acksCache Vector Int
newAcks
      Vector Int -> STM m (Vector Int)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
newAcks

    cacheMessage :: Heartbeat msg -> STM m ()
cacheMessage Heartbeat msg
msg =
      TVar m (StrictSeq (Heartbeat msg))
-> (StrictSeq (Heartbeat msg) -> StrictSeq (Heartbeat msg))
-> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (StrictSeq (Heartbeat msg))
sentMessages (StrictSeq (Heartbeat msg)
-> Heartbeat msg -> StrictSeq (Heartbeat msg)
forall a. StrictSeq a -> a -> StrictSeq a
|> Heartbeat msg
msg)

  reliableCallback :: TVar m (Vector Int)
-> TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Authenticated (ReliableMsg (Heartbeat msg))
-> m ()
reliableCallback TVar m (Vector Int)
acksCache TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
ourIndex (Authenticated (ReliableMsg Vector Int
acknowledged Heartbeat msg
payload) Party
party) = do
    if Vector Int -> Int
forall a. Vector a -> Int
length Vector Int
acknowledged Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties
      then
        Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
          Tracer m ReliabilityLog
tracer
          ReceivedMalformedAcks
            { $sel:fromParty:Resending :: Party
fromParty = Party
party
            , $sel:partyAcks:Resending :: Vector Int
partyAcks = Vector Int
acknowledged
            , $sel:numberOfParties:Resending :: Int
numberOfParties = Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties
            }
      else do
        Maybe (Bool, Int, Vector Int)
eShouldCallbackWithKnownAcks <- STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (Bool, Int, Vector Int))
 -> m (Maybe (Bool, Int, Vector Int)))
-> STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int))
forall a b. (a -> b) -> a -> b
$ MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT (STM m) (Bool, Int, Vector Int)
 -> STM m (Maybe (Bool, Int, Vector Int)))
-> MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int))
forall a b. (a -> b) -> a -> b
$ do
          Vector Int
loadedAcks <- STM m (Vector Int) -> MaybeT (STM m) (Vector Int)
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM m (Vector Int) -> MaybeT (STM m) (Vector Int))
-> STM m (Vector Int) -> MaybeT (STM m) (Vector Int)
forall a b. (a -> b) -> a -> b
$ TVar m (Vector Int) -> STM m (Vector Int)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector Int)
acksCache
          Int
partyIndex <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Maybe Int -> MaybeT (STM m) Int)
-> Maybe Int -> MaybeT (STM m) Int
forall a b. (a -> b) -> a -> b
$ Party -> Maybe Int
findPartyIndex Party
party
          Int
messageAckForParty <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Vector Int
acknowledged Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
partyIndex)
          Int
knownAckForParty <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Maybe Int -> MaybeT (STM m) Int)
-> Maybe Int -> MaybeT (STM m) Int
forall a b. (a -> b) -> a -> b
$ Vector Int
loadedAcks Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
partyIndex
          if
            | Heartbeat msg -> Bool
forall msg. Heartbeat msg -> Bool
isPing Heartbeat msg
payload ->
                -- we do not update indices on Pings but we do propagate them
                (Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Int
partyIndex, Vector Int
loadedAcks)
            | Int
messageAckForParty Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
knownAckForParty Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 -> do
                -- we update indices for next in line messages and propagate them
                let newAcks :: Vector Int
newAcks = Vector Int -> Int -> Vector Int
constructAcks Vector Int
loadedAcks Int
partyIndex
                STM m () -> MaybeT (STM m) ()
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM m () -> MaybeT (STM m) ()) -> STM m () -> MaybeT (STM m) ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acksCache Vector Int
newAcks
                (Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Int
partyIndex, Vector Int
newAcks)
            | Bool
otherwise ->
                -- other messages are dropped
                (Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, Int
partyIndex, Vector Int
loadedAcks)

        case Maybe (Bool, Int, Vector Int)
eShouldCallbackWithKnownAcks of
          Just (Bool
shouldCallback, Int
theirIndex, Vector Int
localCounter) -> do
            if Bool
shouldCallback
              then do
                NetworkCallback (Authenticated (Heartbeat msg)) m
callback Authenticated{Heartbeat msg
payload :: Heartbeat msg
$sel:payload:Authenticated :: Heartbeat msg
payload, Party
party :: Party
$sel:party:Authenticated :: Party
party}
                Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Received{Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex, Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex}
              else Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Ignored{Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex, Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex}

            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heartbeat msg -> Bool
forall msg. Heartbeat msg -> Bool
isPing Heartbeat msg
payload) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
              TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Vector Int
-> Vector Int
-> Int
-> m ()
resendMessagesIfLagging TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
theirIndex Vector Int
localCounter Vector Int
acknowledged Int
ourIndex
          Maybe (Bool, Int, Vector Int)
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  constructAcks :: Vector Int -> Int -> Vector Int
constructAcks Vector Int
acks Int
wantedIndex =
    (Int -> Int -> Int) -> Vector Int -> Vector Int -> Vector Int
forall a b c. (a -> b -> c) -> Vector a -> Vector b -> Vector c
zipWith (\Int
ack Int
i -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
wantedIndex then Int
ack Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 else Int
ack) Vector Int
acks Vector Int
partyIndexes

  partyIndexes :: Vector Int
partyIndexes = Int -> (Int -> Int) -> Vector Int
forall a. Int -> (Int -> a) -> Vector a
generate (Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties) Int -> Int
forall a. a -> a
id

  resendMessagesIfLagging :: TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Vector Int
-> Vector Int
-> Int
-> m ()
resendMessagesIfLagging TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
theirIndex Vector Int
knownAcks Vector Int
acknowledged Int
myIndex = do
    let mmessageAckForUs :: Maybe Int
mmessageAckForUs = Vector Int
acknowledged Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
myIndex
    let mknownAckForUs :: Maybe Int
mknownAckForUs = Vector Int
knownAcks Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
myIndex
    case (Maybe Int
mmessageAckForUs, Maybe Int
mknownAckForUs) of
      (Just Int
messageAckForUs, Just Int
knownAckForUs) ->
        -- We resend messages if our peer notified us that it's lagging behind our
        -- latest message sent
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
messageAckForUs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
knownAckForUs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          let missing :: Vector Int
missing = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
messageAckForUs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 .. Int
knownAckForUs]
          StrictSeq (Heartbeat msg)
storedMessages <- TVar m (StrictSeq (Heartbeat msg)) -> m (StrictSeq (Heartbeat msg))
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (StrictSeq (Heartbeat msg))
sentMessages
          let messages :: IntMap (Heartbeat msg)
messages = [(Int, Heartbeat msg)] -> IntMap (Heartbeat msg)
forall a. [(Int, a)] -> IntMap a
IMap.fromList ([Int] -> [Heartbeat msg] -> [(Int, Heartbeat msg)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 ..] ([Heartbeat msg] -> [(Int, Heartbeat msg)])
-> [Heartbeat msg] -> [(Int, Heartbeat msg)]
forall a b. (a -> b) -> a -> b
$ StrictSeq (Heartbeat msg) -> [Heartbeat msg]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq (Heartbeat msg)
storedMessages)
          Vector Int -> (Int -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Vector Int
missing ((Int -> m ()) -> m ()) -> (Int -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int
idx -> do
            case IntMap (Heartbeat msg)
messages IntMap (Heartbeat msg) -> Int -> Maybe (Heartbeat msg)
forall a. IntMap a -> Int -> Maybe a
IMap.!? Int
idx of
              Maybe (Heartbeat msg)
Nothing ->
                Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer (ReliabilityLog -> m ()) -> ReliabilityLog -> m ()
forall a b. (a -> b) -> a -> b
$
                  ReliabilityFailedToFindMsg
                    { $sel:missingMsgIndex:Resending :: Int
missingMsgIndex = Int
idx
                    , $sel:sentMessagesLength:Resending :: Int
sentMessagesLength = IntMap (Heartbeat msg) -> Int
forall a. IntMap a -> Int
IMap.size IntMap (Heartbeat msg)
messages
                    , $sel:knownAckForUs:Resending :: Int
knownAckForUs = Int
knownAckForUs
                    , $sel:messageAckForUs:Resending :: Int
messageAckForUs = Int
messageAckForUs
                    }
              Just Heartbeat msg
missingMsg -> do
                let localCounter :: Vector Int
localCounter = (Int -> Int -> Int) -> Vector Int -> Vector Int -> Vector Int
forall a b c. (a -> b -> c) -> Vector a -> Vector b -> Vector c
zipWith (\Int
ack Int
i -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
myIndex then Int
idx else Int
ack) Vector Int
knownAcks Vector Int
partyIndexes
                Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Resending{Vector Int
$sel:missing:Resending :: Vector Int
missing :: Vector Int
missing, Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex}
                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ ReliableMsg (Heartbeat msg) -> STM m ()
resend (ReliableMsg (Heartbeat msg) -> STM m ())
-> ReliableMsg (Heartbeat msg) -> STM m ()
forall a b. (a -> b) -> a -> b
$ Vector Int -> Heartbeat msg -> ReliableMsg (Heartbeat msg)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat msg
missingMsg
      (Maybe Int, Maybe Int)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  -- Find the index of a party in the list of all parties.
  -- NOTE: This should never fail.
  findPartyIndex :: Party -> Maybe Int
findPartyIndex Party
party =
    Party -> Vector Party -> Maybe Int
forall a. Eq a => a -> Vector a -> Maybe Int
elemIndex Party
party Vector Party
allParties