{-# OPTIONS_GHC -Wno-orphans #-}
module Hydra.Network.Reliability where
import Hydra.Prelude hiding (empty, fromList, length, replicate, zipWith)
import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (
MonadSTM (readTQueue, writeTQueue),
modifyTVar',
newTQueueIO,
newTVarIO,
readTVarIO,
writeTVar,
)
import Control.Tracer (Tracer)
import Data.IntMap qualified as IMap
import Data.Sequence.Strict ((|>))
import Data.Sequence.Strict qualified as Seq
import Data.Vector (
Vector,
elemIndex,
fromList,
generate,
length,
replicate,
zipWith,
(!?),
)
import Hydra.Logging (traceWith)
import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent)
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), isPing)
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..))
import Hydra.Tx (Party)
import Test.QuickCheck.Instances.Vector ()
data ReliableMsg msg = ReliableMsg
{ forall msg. ReliableMsg msg -> Vector Int
knownMessageIds :: Vector Int
, forall msg. ReliableMsg msg -> msg
payload :: msg
}
deriving stock (ReliableMsg msg -> ReliableMsg msg -> Bool
(ReliableMsg msg -> ReliableMsg msg -> Bool)
-> (ReliableMsg msg -> ReliableMsg msg -> Bool)
-> Eq (ReliableMsg msg)
forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
== :: ReliableMsg msg -> ReliableMsg msg -> Bool
$c/= :: forall msg. Eq msg => ReliableMsg msg -> ReliableMsg msg -> Bool
/= :: ReliableMsg msg -> ReliableMsg msg -> Bool
Eq, Int -> ReliableMsg msg -> ShowS
[ReliableMsg msg] -> ShowS
ReliableMsg msg -> String
(Int -> ReliableMsg msg -> ShowS)
-> (ReliableMsg msg -> String)
-> ([ReliableMsg msg] -> ShowS)
-> Show (ReliableMsg msg)
forall msg. Show msg => Int -> ReliableMsg msg -> ShowS
forall msg. Show msg => [ReliableMsg msg] -> ShowS
forall msg. Show msg => ReliableMsg msg -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall msg. Show msg => Int -> ReliableMsg msg -> ShowS
showsPrec :: Int -> ReliableMsg msg -> ShowS
$cshow :: forall msg. Show msg => ReliableMsg msg -> String
show :: ReliableMsg msg -> String
$cshowList :: forall msg. Show msg => [ReliableMsg msg] -> ShowS
showList :: [ReliableMsg msg] -> ShowS
Show, (forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x)
-> (forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg)
-> Generic (ReliableMsg msg)
forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg
forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall msg x. Rep (ReliableMsg msg) x -> ReliableMsg msg
forall msg x. ReliableMsg msg -> Rep (ReliableMsg msg) x
$cfrom :: forall msg x. ReliableMsg msg -> Rep (ReliableMsg msg) x
from :: forall x. ReliableMsg msg -> Rep (ReliableMsg msg) x
$cto :: forall msg x. Rep (ReliableMsg msg) x -> ReliableMsg msg
to :: forall x. Rep (ReliableMsg msg) x -> ReliableMsg msg
Generic)
deriving anyclass ([ReliableMsg msg] -> Value
[ReliableMsg msg] -> Encoding
ReliableMsg msg -> Bool
ReliableMsg msg -> Value
ReliableMsg msg -> Encoding
(ReliableMsg msg -> Value)
-> (ReliableMsg msg -> Encoding)
-> ([ReliableMsg msg] -> Value)
-> ([ReliableMsg msg] -> Encoding)
-> (ReliableMsg msg -> Bool)
-> ToJSON (ReliableMsg msg)
forall msg. ToJSON msg => [ReliableMsg msg] -> Value
forall msg. ToJSON msg => [ReliableMsg msg] -> Encoding
forall msg. ToJSON msg => ReliableMsg msg -> Bool
forall msg. ToJSON msg => ReliableMsg msg -> Value
forall msg. ToJSON msg => ReliableMsg msg -> Encoding
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: forall msg. ToJSON msg => ReliableMsg msg -> Value
toJSON :: ReliableMsg msg -> Value
$ctoEncoding :: forall msg. ToJSON msg => ReliableMsg msg -> Encoding
toEncoding :: ReliableMsg msg -> Encoding
$ctoJSONList :: forall msg. ToJSON msg => [ReliableMsg msg] -> Value
toJSONList :: [ReliableMsg msg] -> Value
$ctoEncodingList :: forall msg. ToJSON msg => [ReliableMsg msg] -> Encoding
toEncodingList :: [ReliableMsg msg] -> Encoding
$comitField :: forall msg. ToJSON msg => ReliableMsg msg -> Bool
omitField :: ReliableMsg msg -> Bool
ToJSON)
instance ToCBOR msg => ToCBOR (ReliableMsg msg) where
toCBOR :: ReliableMsg msg -> Encoding
toCBOR ReliableMsg{Vector Int
$sel:knownMessageIds:ReliableMsg :: forall msg. ReliableMsg msg -> Vector Int
knownMessageIds :: Vector Int
knownMessageIds, msg
$sel:payload:ReliableMsg :: forall msg. ReliableMsg msg -> msg
payload :: msg
payload} = Vector Int -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR Vector Int
knownMessageIds Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> msg -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR msg
payload
instance FromCBOR msg => FromCBOR (ReliableMsg msg) where
fromCBOR :: forall s. Decoder s (ReliableMsg msg)
fromCBOR = Vector Int -> msg -> ReliableMsg msg
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg (Vector Int -> msg -> ReliableMsg msg)
-> Decoder s (Vector Int) -> Decoder s (msg -> ReliableMsg msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Decoder s (Vector Int)
forall s. Decoder s (Vector Int)
forall a s. FromCBOR a => Decoder s a
fromCBOR Decoder s (msg -> ReliableMsg msg)
-> Decoder s msg -> Decoder s (ReliableMsg msg)
forall a b. Decoder s (a -> b) -> Decoder s a -> Decoder s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Decoder s msg
forall s. Decoder s msg
forall a s. FromCBOR a => Decoder s a
fromCBOR
instance ToCBOR msg => SignableRepresentation (ReliableMsg msg) where
getSignableRepresentation :: ReliableMsg msg -> ByteString
getSignableRepresentation = ReliableMsg msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize'
data ReliabilityLog
= Resending {ReliabilityLog -> Vector Int
missing :: Vector Int, ReliabilityLog -> Vector Int
acknowledged :: Vector Int, ReliabilityLog -> Vector Int
localCounter :: Vector Int, ReliabilityLog -> Int
theirIndex :: Int}
| BroadcastCounter {ReliabilityLog -> Int
ourIndex :: Int, localCounter :: Vector Int}
| BroadcastPing {ourIndex :: Int, localCounter :: Vector Int}
| Received {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int}
| Ignored {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int}
| ReliabilityFailedToFindMsg
{ ReliabilityLog -> Int
missingMsgIndex :: Int
, ReliabilityLog -> Int
sentMessagesLength :: Int
, ReliabilityLog -> Int
knownAckForUs :: Int
, ReliabilityLog -> Int
messageAckForUs :: Int
}
| ReliabilityMissingPartyIndex {ReliabilityLog -> Party
missingParty :: Party}
| ReceivedMalformedAcks
{ ReliabilityLog -> Party
fromParty :: Party
, ReliabilityLog -> Vector Int
partyAcks :: Vector Int
, ReliabilityLog -> Int
numberOfParties :: Int
}
deriving stock (Int -> ReliabilityLog -> ShowS
[ReliabilityLog] -> ShowS
ReliabilityLog -> String
(Int -> ReliabilityLog -> ShowS)
-> (ReliabilityLog -> String)
-> ([ReliabilityLog] -> ShowS)
-> Show ReliabilityLog
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReliabilityLog -> ShowS
showsPrec :: Int -> ReliabilityLog -> ShowS
$cshow :: ReliabilityLog -> String
show :: ReliabilityLog -> String
$cshowList :: [ReliabilityLog] -> ShowS
showList :: [ReliabilityLog] -> ShowS
Show, ReliabilityLog -> ReliabilityLog -> Bool
(ReliabilityLog -> ReliabilityLog -> Bool)
-> (ReliabilityLog -> ReliabilityLog -> Bool) -> Eq ReliabilityLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReliabilityLog -> ReliabilityLog -> Bool
== :: ReliabilityLog -> ReliabilityLog -> Bool
$c/= :: ReliabilityLog -> ReliabilityLog -> Bool
/= :: ReliabilityLog -> ReliabilityLog -> Bool
Eq, (forall x. ReliabilityLog -> Rep ReliabilityLog x)
-> (forall x. Rep ReliabilityLog x -> ReliabilityLog)
-> Generic ReliabilityLog
forall x. Rep ReliabilityLog x -> ReliabilityLog
forall x. ReliabilityLog -> Rep ReliabilityLog x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ReliabilityLog -> Rep ReliabilityLog x
from :: forall x. ReliabilityLog -> Rep ReliabilityLog x
$cto :: forall x. Rep ReliabilityLog x -> ReliabilityLog
to :: forall x. Rep ReliabilityLog x -> ReliabilityLog
Generic)
deriving anyclass ([ReliabilityLog] -> Value
[ReliabilityLog] -> Encoding
ReliabilityLog -> Bool
ReliabilityLog -> Value
ReliabilityLog -> Encoding
(ReliabilityLog -> Value)
-> (ReliabilityLog -> Encoding)
-> ([ReliabilityLog] -> Value)
-> ([ReliabilityLog] -> Encoding)
-> (ReliabilityLog -> Bool)
-> ToJSON ReliabilityLog
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: ReliabilityLog -> Value
toJSON :: ReliabilityLog -> Value
$ctoEncoding :: ReliabilityLog -> Encoding
toEncoding :: ReliabilityLog -> Encoding
$ctoJSONList :: [ReliabilityLog] -> Value
toJSONList :: [ReliabilityLog] -> Value
$ctoEncodingList :: [ReliabilityLog] -> Encoding
toEncodingList :: [ReliabilityLog] -> Encoding
$comitField :: ReliabilityLog -> Bool
omitField :: ReliabilityLog -> Bool
ToJSON)
instance Arbitrary ReliabilityLog where
arbitrary :: Gen ReliabilityLog
arbitrary = Gen ReliabilityLog
forall a.
(Generic a, GA UnsizedOpts (Rep a),
UniformWeight (Weights_ (Rep a))) =>
Gen a
genericArbitrary
shrink :: ReliabilityLog -> [ReliabilityLog]
shrink = ReliabilityLog -> [ReliabilityLog]
forall a.
(Generic a, RecursivelyShrink (Rep a), GSubterms (Rep a) a) =>
a -> [a]
genericShrink
data MessagePersistence m msg = MessagePersistence
{ forall (m :: * -> *) msg.
MessagePersistence m msg -> m (Vector Int)
loadAcks :: m (Vector Int)
, forall (m :: * -> *) msg.
MessagePersistence m msg -> Vector Int -> m ()
saveAcks :: Vector Int -> m ()
, forall (m :: * -> *) msg.
MessagePersistence m msg -> m [Heartbeat msg]
loadMessages :: m [Heartbeat msg]
, forall (m :: * -> *) msg.
MessagePersistence m msg -> Heartbeat msg -> m ()
appendMessage :: Heartbeat msg -> m ()
}
mkMessagePersistence ::
(MonadThrow m, FromJSON msg, ToJSON msg) =>
Int ->
PersistenceIncremental (Heartbeat msg) m ->
Persistence (Vector Int) m ->
MessagePersistence m msg
mkMessagePersistence :: forall (m :: * -> *) msg.
(MonadThrow m, FromJSON msg, ToJSON msg) =>
Int
-> PersistenceIncremental (Heartbeat msg) m
-> Persistence (Vector Int) m
-> MessagePersistence m msg
mkMessagePersistence Int
numberOfParties PersistenceIncremental (Heartbeat msg) m
msgPersistence Persistence (Vector Int) m
ackPersistence =
MessagePersistence
{ $sel:loadAcks:MessagePersistence :: m (Vector Int)
loadAcks = do
Maybe (Vector Int)
macks <- Persistence (Vector Int) m
-> FromJSON (Vector Int) => m (Maybe (Vector Int))
forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load Persistence (Vector Int) m
ackPersistence
case Maybe (Vector Int)
macks of
Maybe (Vector Int)
Nothing -> Vector Int -> m (Vector Int)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Vector Int -> m (Vector Int)) -> Vector Int -> m (Vector Int)
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Int
forall a. Int -> a -> Vector a
replicate Int
numberOfParties Int
0
Just Vector Int
acks -> Vector Int -> m (Vector Int)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
acks
, $sel:saveAcks:MessagePersistence :: Vector Int -> m ()
saveAcks = \Vector Int
acks -> do
Persistence (Vector Int) m
-> ToJSON (Vector Int) => Vector Int -> m ()
forall a (m :: * -> *). Persistence a m -> ToJSON a => a -> m ()
save Persistence (Vector Int) m
ackPersistence Vector Int
acks
, $sel:loadMessages:MessagePersistence :: m [Heartbeat msg]
loadMessages = do
PersistenceIncremental (Heartbeat msg) m
-> FromJSON (Heartbeat msg) => m [Heartbeat msg]
forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll PersistenceIncremental (Heartbeat msg) m
msgPersistence
, $sel:appendMessage:MessagePersistence :: Heartbeat msg -> m ()
appendMessage = \Heartbeat msg
msg -> do
PersistenceIncremental (Heartbeat msg) m
-> ToJSON (Heartbeat msg) => Heartbeat msg -> m ()
forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append PersistenceIncremental (Heartbeat msg) m
msgPersistence Heartbeat msg
msg
}
withReliability ::
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog ->
MessagePersistence m outbound ->
Party ->
[Party] ->
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability :: forall (m :: * -> *) outbound inbound a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m outbound
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability Tracer m ReliabilityLog
tracer MessagePersistence{Vector Int -> m ()
$sel:saveAcks:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> Vector Int -> m ()
saveAcks :: Vector Int -> m ()
saveAcks, m (Vector Int)
$sel:loadAcks:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> m (Vector Int)
loadAcks :: m (Vector Int)
loadAcks, Heartbeat outbound -> m ()
$sel:appendMessage:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> Heartbeat msg -> m ()
appendMessage :: Heartbeat outbound -> m ()
appendMessage, m [Heartbeat outbound]
$sel:loadMessages:MessagePersistence :: forall (m :: * -> *) msg.
MessagePersistence m msg -> m [Heartbeat msg]
loadMessages :: m [Heartbeat outbound]
loadMessages} Party
me [Party]
otherParties NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
withRawNetwork NetworkCallback (Authenticated (Heartbeat inbound)) m
callback Network m (Heartbeat outbound) -> m a
action = do
TVar m (Vector Int)
acksCache <- m (Vector Int)
loadAcks m (Vector Int)
-> (Vector Int -> m (TVar m (Vector Int)))
-> m (TVar m (Vector Int))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Vector Int -> m (TVar m (Vector Int))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO
TVar m (StrictSeq (Heartbeat outbound))
sentMessages <- m [Heartbeat outbound]
loadMessages m [Heartbeat outbound]
-> ([Heartbeat outbound]
-> m (TVar m (StrictSeq (Heartbeat outbound))))
-> m (TVar m (StrictSeq (Heartbeat outbound)))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StrictSeq (Heartbeat outbound)
-> m (TVar m (StrictSeq (Heartbeat outbound)))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (StrictSeq (Heartbeat outbound)
-> m (TVar m (StrictSeq (Heartbeat outbound))))
-> ([Heartbeat outbound] -> StrictSeq (Heartbeat outbound))
-> [Heartbeat outbound]
-> m (TVar m (StrictSeq (Heartbeat outbound)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Heartbeat outbound] -> StrictSeq (Heartbeat outbound)
forall a. [a] -> StrictSeq a
Seq.fromList
TQueue m (ReliableMsg (Heartbeat outbound))
resendQ <- m (TQueue m (ReliableMsg (Heartbeat outbound)))
forall a. m (TQueue m a)
forall (m :: * -> *) a. MonadSTM m => m (TQueue m a)
newTQueueIO
let ourIndex :: Int
ourIndex = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe (Text -> Int
forall a t. (HasCallStack, IsText t) => t -> a
error Text
"This cannot happen because we constructed the list with our party inside.") (Party -> Maybe Int
findPartyIndex Party
me)
let resend :: ReliableMsg (Heartbeat outbound) -> STM m ()
resend = TQueue m (ReliableMsg (Heartbeat outbound))
-> ReliableMsg (Heartbeat outbound) -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (ReliableMsg (Heartbeat outbound))
resendQ
NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
withRawNetwork (TVar m (Vector Int)
-> TVar m (StrictSeq (Heartbeat outbound))
-> (ReliableMsg (Heartbeat outbound) -> STM m ())
-> Int
-> NetworkCallback
(Authenticated (ReliableMsg (Heartbeat inbound))) m
reliableCallback TVar m (Vector Int)
acksCache TVar m (StrictSeq (Heartbeat outbound))
sentMessages ReliableMsg (Heartbeat outbound) -> STM m ()
resend Int
ourIndex) ((Network m (ReliableMsg (Heartbeat outbound)) -> m a) -> m a)
-> (Network m (ReliableMsg (Heartbeat outbound)) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \network :: Network m (ReliableMsg (Heartbeat outbound))
network@Network{ReliableMsg (Heartbeat outbound) -> m ()
broadcast :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: ReliableMsg (Heartbeat outbound) -> m ()
broadcast} -> do
m Any -> (Async m Any -> m a) -> m a
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ STM m (ReliableMsg (Heartbeat outbound))
-> m (ReliableMsg (Heartbeat outbound))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (ReliableMsg (Heartbeat outbound))
-> STM m (ReliableMsg (Heartbeat outbound))
forall a. TQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m (ReliableMsg (Heartbeat outbound))
resendQ) m (ReliableMsg (Heartbeat outbound))
-> (ReliableMsg (Heartbeat outbound) -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReliableMsg (Heartbeat outbound) -> m ()
broadcast) ((Async m Any -> m a) -> m a) -> (Async m Any -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m Any
_ ->
TVar m (StrictSeq (Heartbeat outbound))
-> Int
-> TVar m (Vector Int)
-> Network m (ReliableMsg (Heartbeat outbound))
-> m a
reliableBroadcast TVar m (StrictSeq (Heartbeat outbound))
sentMessages Int
ourIndex TVar m (Vector Int)
acksCache Network m (ReliableMsg (Heartbeat outbound))
network
where
NetworkCallback{Authenticated (Heartbeat inbound) -> m ()
deliver :: Authenticated (Heartbeat inbound) -> m ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} = NetworkCallback (Authenticated (Heartbeat inbound)) m
callback
allParties :: Vector Party
allParties = [Party] -> Vector Party
forall a. [a] -> Vector a
fromList ([Party] -> Vector Party) -> [Party] -> Vector Party
forall a b. (a -> b) -> a -> b
$ [Party] -> [Party]
forall a. Ord a => [a] -> [a]
sort ([Party] -> [Party]) -> [Party] -> [Party]
forall a b. (a -> b) -> a -> b
$ Party
me Party -> [Party] -> [Party]
forall a. a -> [a] -> [a]
: [Party]
otherParties
reliableBroadcast :: TVar m (StrictSeq (Heartbeat outbound))
-> Int
-> TVar m (Vector Int)
-> Network m (ReliableMsg (Heartbeat outbound))
-> m a
reliableBroadcast TVar m (StrictSeq (Heartbeat outbound))
sentMessages Int
ourIndex TVar m (Vector Int)
acksCache Network{ReliableMsg (Heartbeat outbound) -> m ()
broadcast :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: ReliableMsg (Heartbeat outbound) -> m ()
broadcast} =
Network m (Heartbeat outbound) -> m a
action (Network m (Heartbeat outbound) -> m a)
-> Network m (Heartbeat outbound) -> m a
forall a b. (a -> b) -> a -> b
$
Network
{ broadcast :: Heartbeat outbound -> m ()
broadcast = \Heartbeat outbound
msg ->
case Heartbeat outbound
msg of
Data{} -> do
Vector Int
localCounter <- STM m (Vector Int) -> m (Vector Int)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Vector Int) -> m (Vector Int))
-> STM m (Vector Int) -> m (Vector Int)
forall a b. (a -> b) -> a -> b
$ Heartbeat outbound -> STM m ()
cacheMessage Heartbeat outbound
msg STM m () -> STM m (Vector Int) -> STM m (Vector Int)
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM m (Vector Int)
incrementAckCounter
Vector Int -> m ()
saveAcks Vector Int
localCounter
Heartbeat outbound -> m ()
appendMessage Heartbeat outbound
msg
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer BroadcastCounter{Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter}
ReliableMsg (Heartbeat outbound) -> m ()
broadcast (ReliableMsg (Heartbeat outbound) -> m ())
-> ReliableMsg (Heartbeat outbound) -> m ()
forall a b. (a -> b) -> a -> b
$ Vector Int
-> Heartbeat outbound -> ReliableMsg (Heartbeat outbound)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat outbound
msg
Ping{} -> do
Vector Int
localCounter <- TVar m (Vector Int) -> m (Vector Int)
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (Vector Int)
acksCache
Vector Int -> m ()
saveAcks Vector Int
localCounter
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer BroadcastPing{Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter}
ReliableMsg (Heartbeat outbound) -> m ()
broadcast (ReliableMsg (Heartbeat outbound) -> m ())
-> ReliableMsg (Heartbeat outbound) -> m ()
forall a b. (a -> b) -> a -> b
$ Vector Int
-> Heartbeat outbound -> ReliableMsg (Heartbeat outbound)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat outbound
msg
}
where
incrementAckCounter :: STM m (Vector Int)
incrementAckCounter = do
Vector Int
acks <- TVar m (Vector Int) -> STM m (Vector Int)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector Int)
acksCache
let newAcks :: Vector Int
newAcks = Vector Int -> Int -> Vector Int
constructAcks Vector Int
acks Int
ourIndex
TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acksCache Vector Int
newAcks
Vector Int -> STM m (Vector Int)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
newAcks
cacheMessage :: Heartbeat outbound -> STM m ()
cacheMessage Heartbeat outbound
msg =
TVar m (StrictSeq (Heartbeat outbound))
-> (StrictSeq (Heartbeat outbound)
-> StrictSeq (Heartbeat outbound))
-> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (StrictSeq (Heartbeat outbound))
sentMessages (StrictSeq (Heartbeat outbound)
-> Heartbeat outbound -> StrictSeq (Heartbeat outbound)
forall a. StrictSeq a -> a -> StrictSeq a
|> Heartbeat outbound
msg)
reliableCallback :: TVar m (Vector Int)
-> TVar m (StrictSeq (Heartbeat outbound))
-> (ReliableMsg (Heartbeat outbound) -> STM m ())
-> Int
-> NetworkCallback
(Authenticated (ReliableMsg (Heartbeat inbound))) m
reliableCallback TVar m (Vector Int)
acksCache TVar m (StrictSeq (Heartbeat outbound))
sentMessages ReliableMsg (Heartbeat outbound) -> STM m ()
resend Int
ourIndex =
(Authenticated (ReliableMsg (Heartbeat inbound)) -> m ())
-> NetworkCallback
(Authenticated (ReliableMsg (Heartbeat inbound))) m
forall msg (m :: * -> *). (msg -> m ()) -> NetworkCallback msg m
NetworkCallback ((Authenticated (ReliableMsg (Heartbeat inbound)) -> m ())
-> NetworkCallback
(Authenticated (ReliableMsg (Heartbeat inbound))) m)
-> (Authenticated (ReliableMsg (Heartbeat inbound)) -> m ())
-> NetworkCallback
(Authenticated (ReliableMsg (Heartbeat inbound))) m
forall a b. (a -> b) -> a -> b
$ \(Authenticated (ReliableMsg Vector Int
acknowledged Heartbeat inbound
payload) Party
party) -> do
if Vector Int -> Int
forall a. Vector a -> Int
length Vector Int
acknowledged Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties
then
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
Tracer m ReliabilityLog
tracer
ReceivedMalformedAcks
{ $sel:fromParty:Resending :: Party
fromParty = Party
party
, $sel:partyAcks:Resending :: Vector Int
partyAcks = Vector Int
acknowledged
, $sel:numberOfParties:Resending :: Int
numberOfParties = Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties
}
else do
Maybe (Bool, Int, Vector Int)
eShouldCallbackWithKnownAcks <- STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int)))
-> STM m (Maybe (Bool, Int, Vector Int))
-> m (Maybe (Bool, Int, Vector Int))
forall a b. (a -> b) -> a -> b
$ MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int)))
-> MaybeT (STM m) (Bool, Int, Vector Int)
-> STM m (Maybe (Bool, Int, Vector Int))
forall a b. (a -> b) -> a -> b
$ do
Vector Int
loadedAcks <- STM m (Vector Int) -> MaybeT (STM m) (Vector Int)
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM m (Vector Int) -> MaybeT (STM m) (Vector Int))
-> STM m (Vector Int) -> MaybeT (STM m) (Vector Int)
forall a b. (a -> b) -> a -> b
$ TVar m (Vector Int) -> STM m (Vector Int)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector Int)
acksCache
Int
partyIndex <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Maybe Int -> MaybeT (STM m) Int)
-> Maybe Int -> MaybeT (STM m) Int
forall a b. (a -> b) -> a -> b
$ Party -> Maybe Int
findPartyIndex Party
party
Int
messageAckForParty <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Vector Int
acknowledged Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
partyIndex)
Int
knownAckForParty <- Maybe Int -> MaybeT (STM m) Int
forall (m :: * -> *) a. Applicative m => Maybe a -> MaybeT m a
hoistMaybe (Maybe Int -> MaybeT (STM m) Int)
-> Maybe Int -> MaybeT (STM m) Int
forall a b. (a -> b) -> a -> b
$ Vector Int
loadedAcks Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
partyIndex
if
| Heartbeat inbound -> Bool
forall msg. Heartbeat msg -> Bool
isPing Heartbeat inbound
payload ->
(Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Int
partyIndex, Vector Int
loadedAcks)
| Int
messageAckForParty Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
knownAckForParty Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 -> do
let newAcks :: Vector Int
newAcks = Vector Int -> Int -> Vector Int
constructAcks Vector Int
loadedAcks Int
partyIndex
STM m () -> MaybeT (STM m) ()
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM m () -> MaybeT (STM m) ()) -> STM m () -> MaybeT (STM m) ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acksCache Vector Int
newAcks
(Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Int
partyIndex, Vector Int
newAcks)
| Bool
otherwise ->
(Bool, Int, Vector Int) -> MaybeT (STM m) (Bool, Int, Vector Int)
forall a. a -> MaybeT (STM m) a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, Int
partyIndex, Vector Int
loadedAcks)
case Maybe (Bool, Int, Vector Int)
eShouldCallbackWithKnownAcks of
Just (Bool
shouldCallback, Int
theirIndex, Vector Int
localCounter) -> do
if Bool
shouldCallback
then do
Authenticated (Heartbeat inbound) -> m ()
deliver Authenticated{Heartbeat inbound
payload :: Heartbeat inbound
$sel:payload:Authenticated :: Heartbeat inbound
payload, Party
party :: Party
$sel:party:Authenticated :: Party
party}
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Received{Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex, Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex}
else Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Ignored{Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex, Int
$sel:ourIndex:Resending :: Int
ourIndex :: Int
ourIndex}
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heartbeat inbound -> Bool
forall msg. Heartbeat msg -> Bool
isPing Heartbeat inbound
payload) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TVar m (StrictSeq (Heartbeat outbound))
-> (ReliableMsg (Heartbeat outbound) -> STM m ())
-> Int
-> Vector Int
-> Vector Int
-> Int
-> m ()
resendMessagesIfLagging TVar m (StrictSeq (Heartbeat outbound))
sentMessages ReliableMsg (Heartbeat outbound) -> STM m ()
resend Int
theirIndex Vector Int
localCounter Vector Int
acknowledged Int
ourIndex
Maybe (Bool, Int, Vector Int)
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
constructAcks :: Vector Int -> Int -> Vector Int
constructAcks Vector Int
acks Int
wantedIndex =
(Int -> Int -> Int) -> Vector Int -> Vector Int -> Vector Int
forall a b c. (a -> b -> c) -> Vector a -> Vector b -> Vector c
zipWith (\Int
ack Int
i -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
wantedIndex then Int
ack Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 else Int
ack) Vector Int
acks Vector Int
partyIndexes
partyIndexes :: Vector Int
partyIndexes = Int -> (Int -> Int) -> Vector Int
forall a. Int -> (Int -> a) -> Vector a
generate (Vector Party -> Int
forall a. Vector a -> Int
length Vector Party
allParties) Int -> Int
forall a. a -> a
id
resendMessagesIfLagging :: TVar m (StrictSeq (Heartbeat outbound))
-> (ReliableMsg (Heartbeat outbound) -> STM m ())
-> Int
-> Vector Int
-> Vector Int
-> Int
-> m ()
resendMessagesIfLagging TVar m (StrictSeq (Heartbeat outbound))
sentMessages ReliableMsg (Heartbeat outbound) -> STM m ()
resend Int
theirIndex Vector Int
knownAcks Vector Int
acknowledged Int
myIndex = do
let mmessageAckForUs :: Maybe Int
mmessageAckForUs = Vector Int
acknowledged Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
myIndex
let mknownAckForUs :: Maybe Int
mknownAckForUs = Vector Int
knownAcks Vector Int -> Int -> Maybe Int
forall a. Vector a -> Int -> Maybe a
!? Int
myIndex
case (Maybe Int
mmessageAckForUs, Maybe Int
mknownAckForUs) of
(Just Int
messageAckForUs, Just Int
knownAckForUs) ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
messageAckForUs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
knownAckForUs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let missing :: Vector Int
missing = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
messageAckForUs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 .. Int
knownAckForUs]
StrictSeq (Heartbeat outbound)
storedMessages <- TVar m (StrictSeq (Heartbeat outbound))
-> m (StrictSeq (Heartbeat outbound))
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (StrictSeq (Heartbeat outbound))
sentMessages
let messages :: IntMap (Heartbeat outbound)
messages = [(Int, Heartbeat outbound)] -> IntMap (Heartbeat outbound)
forall a. [(Int, a)] -> IntMap a
IMap.fromList ([Int] -> [Heartbeat outbound] -> [(Int, Heartbeat outbound)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 ..] ([Heartbeat outbound] -> [(Int, Heartbeat outbound)])
-> [Heartbeat outbound] -> [(Int, Heartbeat outbound)]
forall a b. (a -> b) -> a -> b
$ StrictSeq (Heartbeat outbound) -> [Heartbeat outbound]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq (Heartbeat outbound)
storedMessages)
Vector Int -> (Int -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Vector Int
missing ((Int -> m ()) -> m ()) -> (Int -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int
idx -> do
case IntMap (Heartbeat outbound)
messages IntMap (Heartbeat outbound) -> Int -> Maybe (Heartbeat outbound)
forall a. IntMap a -> Int -> Maybe a
IMap.!? Int
idx of
Maybe (Heartbeat outbound)
Nothing ->
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer (ReliabilityLog -> m ()) -> ReliabilityLog -> m ()
forall a b. (a -> b) -> a -> b
$
ReliabilityFailedToFindMsg
{ $sel:missingMsgIndex:Resending :: Int
missingMsgIndex = Int
idx
, $sel:sentMessagesLength:Resending :: Int
sentMessagesLength = IntMap (Heartbeat outbound) -> Int
forall a. IntMap a -> Int
IMap.size IntMap (Heartbeat outbound)
messages
, $sel:knownAckForUs:Resending :: Int
knownAckForUs = Int
knownAckForUs
, $sel:messageAckForUs:Resending :: Int
messageAckForUs = Int
messageAckForUs
}
Just Heartbeat outbound
missingMsg -> do
let localCounter :: Vector Int
localCounter = (Int -> Int -> Int) -> Vector Int -> Vector Int -> Vector Int
forall a b c. (a -> b -> c) -> Vector a -> Vector b -> Vector c
zipWith (\Int
ack Int
i -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
myIndex then Int
idx else Int
ack) Vector Int
knownAcks Vector Int
partyIndexes
Tracer m ReliabilityLog -> ReliabilityLog -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m ReliabilityLog
tracer Resending{Vector Int
$sel:missing:Resending :: Vector Int
missing :: Vector Int
missing, Vector Int
$sel:acknowledged:Resending :: Vector Int
acknowledged :: Vector Int
acknowledged, Vector Int
$sel:localCounter:Resending :: Vector Int
localCounter :: Vector Int
localCounter, Int
$sel:theirIndex:Resending :: Int
theirIndex :: Int
theirIndex}
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ ReliableMsg (Heartbeat outbound) -> STM m ()
resend (ReliableMsg (Heartbeat outbound) -> STM m ())
-> ReliableMsg (Heartbeat outbound) -> STM m ()
forall a b. (a -> b) -> a -> b
$ Vector Int
-> Heartbeat outbound -> ReliableMsg (Heartbeat outbound)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
localCounter Heartbeat outbound
missingMsg
(Maybe Int, Maybe Int)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
findPartyIndex :: Party -> Maybe Int
findPartyIndex Party
party =
Party -> Vector Party -> Maybe Int
forall a. Eq a => a -> Vector a -> Maybe Int
elemIndex Party
party Vector Party
allParties