{-# OPTIONS_GHC -Wno-orphans #-}
module Hydra.Network.Reliability where
import Hydra.Prelude hiding (empty, fromList, length, replicate, zipWith)
import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (
MonadSTM (readTQueue, writeTQueue),
modifyTVar',
newTQueueIO,
newTVarIO,
readTVarIO,
writeTVar,
)
import Control.Tracer (Tracer)
import Data.IntMap qualified as IMap
import Data.Sequence.Strict ((|>))
import Data.Sequence.Strict qualified as Seq
import Data.Vector (
Vector,
elemIndex,
fromList,
generate,
length,
replicate,
zipWith,
(!?),
)
import Hydra.Logging (traceWith)
import Hydra.Network (Network (..), NetworkComponent)
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), isPing)
import Hydra.Party (Party)
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..))
import Test.QuickCheck (getPositive, listOf)
data ReliableMsg msg = ReliableMsg
{ forall msg. ReliableMsg msg -> Vector Int
knownMessageIds :: Vector Int
, forall msg. ReliableMsg msg -> msg
payload :: msg
}
deriving stock (ReliableMsg msg -> ReliableMsg msg -> Bool
(ReliableMsg msg -> ReliableMsg msg -> Bool)
-> (ReliableMsg msg -> ReliableMsg msg -> Bool)
-> Eq (ReliableMsg msg)
forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
== :: ReliableMsg msg -> ReliableMsg msg -> Bool
$c/= :: forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
/= :: ReliableMsg msg -> ReliableMsg msg -> Bool
Eq, Int -> ReliableMsg msg -> ShowS
[ReliableMsg msg] -> ShowS
ReliableMsg msg -> String
(Int -> ReliableMsg msg -> ShowS)
-> (ReliableMsg msg -> String)
-> ([ReliableMsg msg] -> ShowS)
-> Show (ReliableMsg msg)
forall msg. Show msg => Int -> ReliableMsg msg -> ShowS
forall msg. Show msg => [ReliableMsg msg] -> ShowS
forall msg. Show msg => ReliableMsg msg -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall msg. Show msg => Int -> ReliableMsg msg -> ShowS
showsPrec :: Int -> ReliableMsg msg -> ShowS
$cshow :: forall msg. Show msg => ReliableMsg msg -> String
show :: ReliableMsg msg -> String
$cshowList :: forall msg. Show msg => [ReliableMsg msg] -> ShowS
showList :: [ReliableMsg msg] -> ShowS
Show, (forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x)
-> (forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg)
-> Generic (ReliableMsg msg)
forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg
forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall msg x. Rep (ReliableMsg msg) x -> ReliableMsg msg
forall msg x. ReliableMsg msg -> Rep (ReliableMsg msg) x
$cfrom :: forall msg x. ReliableMsg msg -> Rep (ReliableMsg msg) x
from :: forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x
$cto :: forall msg x. Rep (ReliableMsg msg) x -> ReliableMsg msg
to :: forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg
Generic)
deriving anyclass ([ReliableMsg msg] -> Value
[ReliableMsg msg] -> Encoding
ReliableMsg msg -> Bool
ReliableMsg msg -> Value
ReliableMsg msg -> Encoding
(ReliableMsg msg -> Value)
-> (ReliableMsg msg -> Encoding)
-> ([ReliableMsg msg] -> Value)
-> ([ReliableMsg msg] -> Encoding)
-> (ReliableMsg msg -> Bool)
-> ToJSON (ReliableMsg msg)
forall msg. ToJSON msg => [ReliableMsg msg] -> Value
forall msg. ToJSON msg => [ReliableMsg msg] -> Encoding
forall msg. ToJSON msg => ReliableMsg msg -> Bool
forall msg. ToJSON msg => ReliableMsg msg -> Value
forall msg. ToJSON msg => ReliableMsg msg -> Encoding
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: forall msg. ToJSON msg => ReliableMsg msg -> Value
toJSON :: ReliableMsg msg -> Value
$ctoEncoding :: forall msg. ToJSON msg => ReliableMsg msg -> Encoding
toEncoding :: ReliableMsg msg -> Encoding
$ctoJSONList :: forall msg. ToJSON msg => [ReliableMsg msg] -> Value
toJSONList :: [ReliableMsg msg] -> Value
$ctoEncodingList :: forall msg. ToJSON msg => [ReliableMsg msg] -> Encoding
toEncodingList :: [ReliableMsg msg] -> Encoding
$comitField :: forall msg. ToJSON msg => ReliableMsg msg -> Bool
omitField :: ReliableMsg msg -> Bool
ToJSON, Maybe (ReliableMsg msg)
Value -> Parser [ReliableMsg msg]
Value -> Parser (ReliableMsg msg)
(Value -> Parser (ReliableMsg msg))
-> (Value -> Parser [ReliableMsg msg])
-> Maybe (ReliableMsg msg)
-> FromJSON (ReliableMsg msg)
forall msg. FromJSON msg => Maybe (ReliableMsg msg)
forall msg. FromJSON msg => Value -> Parser [ReliableMsg msg]
forall msg. FromJSON msg => Value -> Parser (ReliableMsg msg)
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: forall msg. FromJSON msg => Value -> Parser (ReliableMsg msg)
parseJSON :: Value -> Parser (ReliableMsg msg)
$cparseJSONList :: forall msg. FromJSON msg => Value -> Parser [ReliableMsg msg]
parseJSONList :: Value -> Parser [ReliableMsg msg]
$comittedField :: forall msg. FromJSON msg => Maybe (ReliableMsg msg)
omittedField :: Maybe (ReliableMsg msg)
FromJSON)
instance ToCBOR msg => ToCBOR (ReliableMsg msg) where
toCBOR :: ReliableMsg msg -> Encoding
toCBOR ReliableMsg{Vector Int
$sel:knownMessageIds:ReliableMsg :: forall msg. ReliableMsg msg -> Vector Int
knownMessageIds :: Vector Int
knownMessageIds, msg
$sel:payload:ReliableMsg :: forall msg. ReliableMsg msg -> msg
payload :: msg
payload} = Vector Int -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR Vector Int
knownMessageIds Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> msg -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR msg
payload
instance FromCBOR msg => FromCBOR (ReliableMsg msg) where
fromCBOR :: forall s. Decoder s (ReliableMsg msg)
fromCBOR = Vector Int -> msg -> ReliableMsg msg
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg (Vector Int -> msg -> ReliableMsg msg)
-> Decoder s (Vector Int) -> Decoder s (msg -> ReliableMsg msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Decoder s (Vector Int)
forall s. Decoder s (Vector Int)
forall a s. FromCBOR a => Decoder s a
fromCBOR Decoder s (msg -> ReliableMsg msg)
-> Decoder s msg -> Decoder s (ReliableMsg msg)
forall a b. Decoder s (a -> b) -> Decoder s a -> Decoder s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Decoder s msg
forall s. Decoder s msg
forall a s. FromCBOR a => Decoder s a
fromCBOR
instance ToCBOR msg => SignableRepresentation (ReliableMsg msg) where
getSignableRepresentation :: ReliableMsg msg -> ByteString
getSignableRepresentation = ReliableMsg msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize'
data ReliabilityLog
= Resending {ReliabilityLog -> Vector Int
missing :: Vector Int, ReliabilityLog -> Vector Int
acknowledged :: Vector Int, ReliabilityLog -> Vector Int
localCounter :: Vector Int, ReliabilityLog -> Int
theirIndex :: Int}
| BroadcastCounter {ReliabilityLog -> Int
ourIndex :: Int, localCounter :: Vector Int}
| BroadcastPing {ourIndex :: Int, localCounter :: Vector Int}
| Received {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int}
| Ignored {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int}
| ReliabilityFailedToFindMsg
{ ReliabilityLog -> Int
missingMsgIndex :: Int
, ReliabilityLog -> Int
sentMessagesLength :: Int
, ReliabilityLog -> Int
knownAckForUs :: Int
, ReliabilityLog -> Int
messageAckForUs :: Int
}
| ReliabilityMissingPartyIndex {ReliabilityLog -> Party
missingParty :: Party}
| ReceivedMalformedAcks
{ ReliabilityLog -> Party
fromParty :: Party
, ReliabilityLog -> Vector Int
partyAcks :: Vector Int
, ReliabilityLog -> Int
numberOfParties :: Int
}
deriving stock (Int -> ReliabilityLog -> ShowS
[ReliabilityLog] -> ShowS
ReliabilityLog -> String
(Int -> ReliabilityLog -> ShowS)
-> (ReliabilityLog -> String)
-> ([ReliabilityLog] -> ShowS)
-> Show ReliabilityLog
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReliabilityLog -> ShowS
showsPrec :: Int -> ReliabilityLog -> ShowS
$cshow :: ReliabilityLog -> String
show :: ReliabilityLog -> String
$cshowList :: [ReliabilityLog] -> ShowS
showList :: [ReliabilityLog] -> ShowS
Show, ReliabilityLog -> ReliabilityLog -> Bool
(ReliabilityLog -> ReliabilityLog -> Bool)
-> (ReliabilityLog -> ReliabilityLog -> Bool) -> Eq ReliabilityLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReliabilityLog -> ReliabilityLog -> Bool
== :: ReliabilityLog -> ReliabilityLog -> Bool
$c/= :: ReliabilityLog -> ReliabilityLog -> Bool
/= :: ReliabilityLog -> ReliabilityLog -> Bool
Eq, (forall x. ReliabilityLog -> Rep ReliabilityLog x)
-> (forall x. Rep ReliabilityLog x -> ReliabilityLog)
-> Generic ReliabilityLog
forall x. Rep ReliabilityLog x -> ReliabilityLog
forall x. ReliabilityLog -> Rep ReliabilityLog x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ReliabilityLog -> Rep ReliabilityLog x
from :: forall x. ReliabilityLog -> Rep ReliabilityLog x
$cto :: forall x. Rep ReliabilityLog x -> ReliabilityLog
to :: forall x. Rep ReliabilityLog x -> ReliabilityLog
Generic)
deriving anyclass ([ReliabilityLog] -> Value
[ReliabilityLog] -> Encoding
ReliabilityLog -> Bool
ReliabilityLog -> Value
ReliabilityLog -> Encoding
(ReliabilityLog -> Value)
-> (ReliabilityLog -> Encoding)
-> ([ReliabilityLog] -> Value)
-> ([ReliabilityLog] -> Encoding)
-> (ReliabilityLog -> Bool)
-> ToJSON ReliabilityLog
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: ReliabilityLog -> Value
toJSON :: ReliabilityLog -> Value
$ctoEncoding :: ReliabilityLog -> Encoding
toEncoding :: ReliabilityLog -> Encoding
$ctoJSONList :: [ReliabilityLog] -> Value
toJSONList :: [ReliabilityLog] -> Value
$ctoEncodingList :: [ReliabilityLog] -> Encoding
toEncodingList :: [ReliabilityLog] -> Encoding
$comitField :: ReliabilityLog -> Bool
omitField :: ReliabilityLog -> Bool
ToJSON, Maybe ReliabilityLog
Value -> Parser [ReliabilityLog]
Value -> Parser ReliabilityLog
(Value -> Parser ReliabilityLog)
-> (Value -> Parser [ReliabilityLog])
-> Maybe ReliabilityLog
-> FromJSON ReliabilityLog
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser ReliabilityLog
parseJSON :: Value -> Parser ReliabilityLog
$cparseJSONList :: Value -> Parser [ReliabilityLog]
parseJSONList :: Value -> Parser [ReliabilityLog]
$comittedField :: Maybe ReliabilityLog
omittedField :: Maybe ReliabilityLog
FromJSON)
instance Arbitrary (Vector Int) where
arbitrary :: Gen (Vector Int)
arbitrary = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList ([Int] -> Vector Int) -> Gen [Int] -> Gen (Vector Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen Int -> Gen [Int]
forall a. Gen a -> Gen [a]
listOf (Positive Int -> Int
forall a. Positive a -> a
getPositive (Positive Int -> Int) -> Gen (Positive Int) -> Gen Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen (Positive Int)
forall a. Arbitrary a => Gen a
arbitrary)
shrink :: Vector Int -> [Vector Int]
shrink Vector Int
v = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList ([Int] -> Vector Int) -> [[Int]] -> [Vector Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Int] -> [[Int]]
forall a. Arbitrary a => a -> [a]
shrink (Vector Int -> [Int]
forall a. Vector a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Vector Int
v)
instance Arbitrary ReliabilityLog where
arbitrary :: Gen ReliabilityLog
arbitrary = Gen ReliabilityLog
forall a.
(Generic a, GA UnsizedOpts (Rep a),
UniformWeight (Weights_ (Rep a))) =>
Gen a
genericArbitrary
shrink :: ReliabilityLog -> [ReliabilityLog]
shrink = ReliabilityLog -> [ReliabilityLog]
forall a.
(Generic a, RecursivelyShrink (Rep a), GSubterms (Rep a) a) =>
a -> [a]
genericShrink
data MessagePersistence m msg = MessagePersistence
{ forall (m :: * -> *) msg.
MessagePersistence m msg -> m (Vector Int)
loadAcks :: m (Vector Int)
, forall (m :: * -> *) msg.
MessagePersistence m msg -> Vector Int -> m ()
saveAcks :: Vector Int -> m ()
, forall (m :: * -> *) msg.
MessagePersistence m msg -> m [Heartbeat msg]
loadMessages :: m [Heartbeat msg]
, forall (m :: * -> *) msg.
MessagePersistence m msg -> Heartbeat msg -> m ()
appendMessage :: Heartbeat msg -> m ()
}
mkMessagePersistence ::
(MonadThrow m, FromJSON msg, ToJSON msg) =>
Int ->
PersistenceIncremental (Heartbeat msg) m ->
Persistence (Vector Int) m ->
MessagePersistence m msg
mkMessagePersistence :: forall (m :: * -> *) msg.
(MonadThrow m, FromJSON msg, ToJSON msg) =>
Int
-> PersistenceIncremental (Heartbeat msg) m
-> Persistence (Vector Int) m
-> MessagePersistence m msg
mkMessagePersistence Int
numberOfParties PersistenceIncremental (Heartbeat msg) m
msgPersistence Persistence (Vector Int) m
ackPersistence =
MessagePersistence
{ $sel:loadAcks:MessagePersistence :: m (Vector Int)
loadAcks = do
Maybe (Vector Int)
macks <- Persistence (Vector Int) m
-> FromJSON (Vector Int) => m (Maybe (Vector Int))
forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load Persistence (Vector Int) m
ackPersistence
case Maybe (Vector Int)
macks of
Maybe (Vector Int)
Nothing -> Vector Int -> m (Vector Int)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Vector Int -> m (Vector Int)) -> Vector Int -> m (Vector Int)
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Int
forall a. Int -> a -> Vector a
replicate Int
numberOfParties Int
0
Just Vector Int
acks -> Vector Int -> m (Vector Int)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
acks
, $sel:saveAcks:MessagePersistence :: Vector Int -> m ()
saveAcks = \Vector Int
acks -> do
Persistence (Vector Int) m
-> ToJSON (Vector Int) => Vector Int -> m ()
forall a (m :: * -> *). Persistence a m -> ToJSON a => a -> m ()
save Persistence (Vector Int) m
ackPersistence Vector Int
acks
, $sel:loadMessages:MessagePersistence :: m [Heartbeat msg]
loadMessages = do
PersistenceIncremental (Heartbeat msg) m
-> FromJSON (Heartbeat msg) => m [Heartbeat msg]
forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll PersistenceIncremental (Heartbeat msg) m
msgPersistence
, $sel:appendMessage:MessagePersistence :: Heartbeat msg -> m ()
appendMessage = \Heartbeat msg
msg -> do
PersistenceIncremental (Heartbeat msg) m
-> ToJSON (Heartbeat msg) => Heartbeat msg -> m ()
forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append PersistenceIncremental (Heartbeat msg) m
msgPersistence Heartbeat msg
msg
}
withReliability ::
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog ->
MessagePersistence m msg ->
Party ->
[Party] ->
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat msg))) (ReliableMsg (Heartbeat msg)) a ->
NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability :: forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
a
-> NetworkComponent
m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability Tracer m ReliabilityLog
tracer MessagePersistence{Vector Int -> m ()
$sel:saveAcks:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> Vector Int -> m ()
saveAcks :: Vector Int -> m ()
saveAcks, m (Vector Int)
$sel:loadAcks:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> m (Vector Int)
loadAcks :: m (Vector Int)
loadAcks, Heartbeat msg -> m ()
$sel:appendMessage:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> Heartbeat msg -> m ()
appendMessage :: Heartbeat msg -> m ()
appendMessage, m [Heartbeat msg]
$sel:loadMessages:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> m [Heartbeat msg]
loadMessages :: m [Heartbeat msg]
loadMessages} Party
me [Party]
otherParties NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
a
withRawNetwork NetworkCallback (Authenticated (Heartbeat msg)) m
callback Network m (Heartbeat msg) -> m a
action = do
TVar m (Vector Int)
acksCache <- m (Vector Int)
loadAcks m (Vector Int)
-> (Vector Int -> m (TVar m (Vector Int)))
-> m (TVar m (Vector Int))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Vector Int -> m (TVar m (Vector Int))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO
TVar m (StrictSeq (Heartbeat msg))
sentMessages <- m [Heartbeat msg]
loadMessages m [Heartbeat msg]
-> ([Heartbeat msg] -> m (TVar m (StrictSeq (Heartbeat msg))))
-> m (TVar m (StrictSeq (Heartbeat msg)))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StrictSeq (Heartbeat msg) -> m (TVar m (StrictSeq (Heartbeat msg)))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (StrictSeq (Heartbeat msg)
-> m (TVar m (StrictSeq (Heartbeat msg))))
-> ([Heartbeat msg] -> StrictSeq (Heartbeat msg))
-> [Heartbeat msg]
-> m (TVar m (StrictSeq (Heartbeat msg)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Heartbeat msg] -> StrictSeq (Heartbeat msg)
forall a. [a] -> StrictSeq a
Seq.fromList
TQueue m (ReliableMsg (Heartbeat msg))
resendQ <- m (TQueue m (ReliableMsg (Heartbeat msg)))
forall a. m (TQueue m a)
forall (m :: * -> *) a. MonadSTM m => m (TQueue m a)
newTQueueIO
let ourIndex :: Int
ourIndex = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (Text -> Int
forall a t. (HasCallStack, IsText t) => t -> a
error Text
"This cannot happen because we constructed the list with our party inside.") (Party -> Maybe Int
findPartyIndex Party
me)
let resend :: ReliableMsg (Heartbeat msg) -> STM m ()
resend = TQueue m (ReliableMsg (Heartbeat msg))
-> ReliableMsg (Heartbeat msg) -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (ReliableMsg (Heartbeat msg))
resendQ
NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
a
withRawNetwork (TVar m (Vector Int)
-> TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Authenticated (ReliableMsg (Heartbeat msg))
-> m ()
reliableCallback TVar m (Vector Int)
acksCache TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
ourIndex) ((Network m (ReliableMsg (Heartbeat msg)) -> m a) -> m a)
-> (Network m (ReliableMsg (Heartbeat msg)) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \network :: Network m (ReliableMsg (Heartbeat msg))
network@Network{ReliableMsg (Heartbeat msg) -> m ()
broadcast :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: ReliableMsg (Heartbeat msg) -> m ()
broadcast} -> do
m Any -> (Async m Any -> m a) -> m a
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ STM m (ReliableMsg (Heartbeat msg))
-> m (ReliableMsg (Heartbeat msg))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (ReliableMsg (Heartbeat msg))
-> STM m (ReliableMsg (Heartbeat msg))
forall a. TQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m (ReliableMsg (Heartbeat msg))
resendQ) m (ReliableMsg (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReliableMsg (Heartbeat msg) -> m ()
broadcast) ((Async m Any -> m a) -> m a) -> (Async m Any -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m Any
_ ->
TVar m (StrictSeq (Heartbeat msg))
-> Int
-> TVar m (Vector Int)
-> Network m (ReliableMsg (Heartbeat msg))
-> m a
reliableBroadcast TVar m (StrictSeq (Heartbeat msg))
sentMessages Int
ourIndex TVar m (Vector Int)
acksCache Network m (ReliableMsg (Heartbeat msg))
network
where
allParties :: Vector Party
allParties = [Party] -> Vector Party
forall a. [a] -> Vector a
fromList ([Party] -> Vector Party) -> [Party] -> Vector Party
forall a b. (a -> b) -> a -> b
$ [Party] -> [Party]
forall a. Ord a => [a] -> [a]
sort ([Party] -> [Party]) -> [Party] -> [Party]
forall a b. (a -> b) -> a -> b
$ Party
me Party -> [Party] -> [Party]
forall a. a -> [a] -> [a]
: [Party]
otherParties
reliableBroadcast :: TVar m (StrictSeq (Heartbeat msg))
-> Int
-> TVar m (Vector Int)
-> Network m (ReliableMsg (Heartbeat msg))
-> m a
reliableBroadcast TVar m (StrictSeq (Heartbeat msg))
sentMessages Int
ourIndex TVar m (Vector Int)
acksCache Network{ReliableMsg (Heartbeat msg) -> m ()
broadcast :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: ReliableMsg (Heartbeat msg) -> m ()
broadcast} =
Network m (Heartbeat msg) -> m a
action (Network m (Heartbeat msg) -> m a)
-> Network m (Heartbeat msg) -> m a
forall a b. (a -> b) -> a -> b
$
Network
{ broadcast :: Heartbeat msg -> m ()
broadcast = \Heartbeat msg
msg ->
case Heartbeat msg
msg of
Data{} -> do
Vector Int
localCounter <- STM m (Vector Int) -> m (Vector Int)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Vector Int) -> m (Vector Int))
-> STM m (Vector Int) -> m (Vector Int)
forall a b. (a -> b) -> a -> b
$ Heartbeat msg -> STM m ()
cacheMessage Heartbeat msg
msg STM m () -> STM m (Vector Int) -> STM m (Vector Int)
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM m (Vector Int)
incrementAckCounter
Vector Int -> m ()
saveAcks Vector Int
localCounter
Heartbeat msg -> m ()
appendMessage Heartbeat msg
msg
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer BroadcastCounter{Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter}
ReliableMsg (Heartbeat msg) -> m ()
broadcast (ReliableMsg (Heartbeat msg) -> m ())
-> ReliableMsg (Heartbeat msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Vector Int -> Heartbeat msg -> ReliableMsg (Heartbeat msg)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat msg
msg
Ping{} -> do
Vector Int
localCounter <- TVar m (Vector Int) -> m (Vector Int)
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (Vector Int)
acksCache
Vector Int -> m ()
saveAcks Vector Int
localCounter
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer BroadcastPing{Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter}
ReliableMsg (Heartbeat msg) -> m ()
broadcast (ReliableMsg (Heartbeat msg) -> m ())
-> ReliableMsg (Heartbeat msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Vector Int -> Heartbeat msg -> ReliableMsg (Heartbeat msg)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat msg
msg
}
where
incrementAckCounter :: STM m (Vector Int)
incrementAckCounter = do
Vector Int
acks <- TVar m (Vector Int) -> STM m (Vector Int)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector Int)
acksCache
let newAcks :: Vector Int
newAcks = Vector Int -> Int -> Vector Int
constructAcks Vector Int
acks Int
ourIndex
TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acksCache Vector Int
newAcks
Vector Int -> STM m (Vector Int)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
newAcks
cacheMessage :: Heartbeat msg -> STM m ()
cacheMessage Heartbeat msg
msg =
TVar m (StrictSeq (Heartbeat msg))
-> (StrictSeq (Heartbeat msg) -> StrictSeq (Heartbeat msg))
-> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (StrictSeq (Heartbeat msg))
sentMessages (StrictSeq (Heartbeat msg)
-> Heartbeat msg -> StrictSeq (Heartbeat msg)
forall a. StrictSeq a -> a -> StrictSeq a
|> Heartbeat msg
msg)
reliableCallback :: TVar m (Vector Int)
-> TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Authenticated (ReliableMsg (Heartbeat msg))
-> m ()
reliableCallback TVar m (Vector Int)
acksCache TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
ourIndex (Authenticated (ReliableMsg Vector Int
acknowledged Heartbeat msg
payload) Party
party) = do
if Vector Int -> Int
forall a. Vector a -> Int
length Vector Int
acknowledged Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties
then
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
Tracer m ReliabilityLog
tracer
ReceivedMalformedAcks
{ $sel:fromParty:Resending :: Party
fromParty = Party
party
, $sel:partyAcks:Resending :: Vector Int
partyAcks = Vector Int
acknowledged
, $sel:numberOfParties:Resending :: Int
numberOfParties = Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties
}
else do
Maybe (Bool, Int, Vector Int)
eShouldCallbackWithKnownAcks <- STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int)))
-> STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int))
forall a b. (a -> b) -> a -> b
$ MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int)))
-> MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int))
forall a b. (a -> b) -> a -> b
$ do
Vector Int
loadedAcks <- STM m (Vector Int) -> MaybeT (STM m) (Vector Int)
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM m (Vector Int) -> MaybeT (STM m) (Vector Int))
-> STM m (Vector Int) -> MaybeT (STM m) (Vector Int)
forall a b. (a -> b) -> a -> b
$ TVar m (Vector Int) -> STM m (Vector Int)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector Int)
acksCache
Int
partyIndex <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Maybe Int -> MaybeT (STM m) Int)
-> Maybe Int -> MaybeT (STM m) Int
forall a b. (a -> b) -> a -> b
$ Party -> Maybe Int
findPartyIndex Party
party
Int
messageAckForParty <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Vector Int
acknowledged Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
partyIndex)
Int
knownAckForParty <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Maybe Int -> MaybeT (STM m) Int)
-> Maybe Int -> MaybeT (STM m) Int
forall a b. (a -> b) -> a -> b
$ Vector Int
loadedAcks Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
partyIndex
if
| Heartbeat msg -> Bool
forall msg. Heartbeat msg -> Bool
isPing Heartbeat msg
payload ->
(Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Int
partyIndex, Vector Int
loadedAcks)
| Int
messageAckForParty Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
knownAckForParty Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 -> do
let newAcks :: Vector Int
newAcks = Vector Int -> Int -> Vector Int
constructAcks Vector Int
loadedAcks Int
partyIndex
STM m () -> MaybeT (STM m) ()
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM m () -> MaybeT (STM m) ()) -> STM m () -> MaybeT (STM m) ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acksCache Vector Int
newAcks
(Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Int
partyIndex, Vector Int
newAcks)
| Bool
otherwise ->
(Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, Int
partyIndex, Vector Int
loadedAcks)
case Maybe (Bool, Int, Vector Int)
eShouldCallbackWithKnownAcks of
Just (Bool
shouldCallback, Int
theirIndex, Vector Int
localCounter) -> do
if Bool
shouldCallback
then do
NetworkCallback (Authenticated (Heartbeat msg)) m
callback Authenticated{Heartbeat msg
payload :: Heartbeat msg
$sel:payload:Authenticated :: Heartbeat msg
payload, Party
party :: Party
$sel:party:Authenticated :: Party
party}
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Received{Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex, Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex}
else Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Ignored{Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex, Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex}
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heartbeat msg -> Bool
forall msg. Heartbeat msg -> Bool
isPing Heartbeat msg
payload) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Vector Int
-> Vector Int
-> Int
-> m ()
resendMessagesIfLagging TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
theirIndex Vector Int
localCounter Vector Int
acknowledged Int
ourIndex
Maybe (Bool, Int, Vector Int)
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
constructAcks :: Vector Int -> Int -> Vector Int
constructAcks Vector Int
acks Int
wantedIndex =
(Int -> Int -> Int) -> Vector Int -> Vector Int -> Vector Int
forall a b c. (a -> b -> c) -> Vector a -> Vector b -> Vector c
zipWith (\Int
ack Int
i -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
wantedIndex then Int
ack Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 else Int
ack) Vector Int
acks Vector Int
partyIndexes
partyIndexes :: Vector Int
partyIndexes = Int -> (Int -> Int) -> Vector Int
forall a. Int -> (Int -> a) -> Vector a
generate (Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties) Int -> Int
forall a. a -> a
id
resendMessagesIfLagging :: TVar m (StrictSeq (Heartbeat msg))
-> (ReliableMsg (Heartbeat msg) -> STM m ())
-> Int
-> Vector Int
-> Vector Int
-> Int
-> m ()
resendMessagesIfLagging TVar m (StrictSeq (Heartbeat msg))
sentMessages ReliableMsg (Heartbeat msg) -> STM m ()
resend Int
theirIndex Vector Int
knownAcks Vector Int
acknowledged Int
myIndex = do
let mmessageAckForUs :: Maybe Int
mmessageAckForUs = Vector Int
acknowledged Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
myIndex
let mknownAckForUs :: Maybe Int
mknownAckForUs = Vector Int
knownAcks Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
myIndex
case (Maybe Int
mmessageAckForUs, Maybe Int
mknownAckForUs) of
(Just Int
messageAckForUs, Just Int
knownAckForUs) ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
messageAckForUs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
knownAckForUs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let missing :: Vector Int
missing = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
messageAckForUs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 .. Int
knownAckForUs]
StrictSeq (Heartbeat msg)
storedMessages <- TVar m (StrictSeq (Heartbeat msg)) -> m (StrictSeq (Heartbeat msg))
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (StrictSeq (Heartbeat msg))
sentMessages
let messages :: IntMap (Heartbeat msg)
messages = [(Int, Heartbeat msg)] -> IntMap (Heartbeat msg)
forall a. [(Int, a)] -> IntMap a
IMap.fromList ([Int] -> [Heartbeat msg] -> [(Int, Heartbeat msg)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 ..] ([Heartbeat msg] -> [(Int, Heartbeat msg)])
-> [Heartbeat msg] -> [(Int, Heartbeat msg)]
forall a b. (a -> b) -> a -> b
$ StrictSeq (Heartbeat msg) -> [Heartbeat msg]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq (Heartbeat msg)
storedMessages)
Vector Int -> (Int -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Vector Int
missing ((Int -> m ()) -> m ()) -> (Int -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int
idx -> do
case IntMap (Heartbeat msg)
messages IntMap (Heartbeat msg) -> Int -> Maybe (Heartbeat msg)
forall a. IntMap a -> Int -> Maybe a
IMap.!? Int
idx of
Maybe (Heartbeat msg)
Nothing ->
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer (ReliabilityLog -> m ()) -> ReliabilityLog -> m ()
forall a b. (a -> b) -> a -> b
$
ReliabilityFailedToFindMsg
{ $sel:missingMsgIndex:Resending :: Int
missingMsgIndex = Int
idx
, $sel:sentMessagesLength:Resending :: Int
sentMessagesLength = IntMap (Heartbeat msg) -> Int
forall a. IntMap a -> Int
IMap.size IntMap (Heartbeat msg)
messages
, $sel:knownAckForUs:Resending :: Int
knownAckForUs = Int
knownAckForUs
, $sel:messageAckForUs:Resending :: Int
messageAckForUs = Int
messageAckForUs
}
Just Heartbeat msg
missingMsg -> do
let localCounter :: Vector Int
localCounter = (Int -> Int -> Int) -> Vector Int -> Vector Int -> Vector Int
forall a b c. (a -> b -> c) -> Vector a -> Vector b -> Vector c
zipWith (\Int
ack Int
i -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
myIndex then Int
idx else Int
ack) Vector Int
knownAcks Vector Int
partyIndexes
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Resending{Vector Int
$sel:missing:Resending :: Vector Int
missing :: Vector Int
missing, Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex}
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ ReliableMsg (Heartbeat msg) -> STM m ()
resend (ReliableMsg (Heartbeat msg) -> STM m ())
-> ReliableMsg (Heartbeat msg) -> STM m ()
forall a b. (a -> b) -> a -> b
$ Vector Int -> Heartbeat msg -> ReliableMsg (Heartbeat msg)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat msg
missingMsg
(Maybe Int, Maybe Int)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
findPartyIndex :: Party -> Maybe Int
findPartyIndex Party
party =
Party -> Vector Party -> Maybe Int
forall a. Eq a => a -> Vector a -> Maybe Int
elemIndex Party
party Vector Party
allParties