module Hydra.Network.ReliabilitySpec where
import Hydra.Prelude hiding (empty, fromList, head, replicate, unlines)
import Test.Hydra.Prelude
import Control.Concurrent.Class.MonadSTM (
MonadSTM (readTQueue, readTVarIO, writeTQueue),
check,
modifyTVar',
newTQueueIO,
newTVarIO,
writeTVar,
)
import Control.Monad.IOSim (runSimOrThrow)
import Control.Tracer (Tracer (..), nullTracer)
import Data.Sequence.Strict ((|>))
import Data.Vector (Vector, empty, fromList, head, replicate, snoc)
import Data.Vector qualified as Vector
import Hydra.Network (Network (..), NetworkCallback (..))
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
import Hydra.Network.Message (Connectivity)
import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability)
import Hydra.Node.Network (withFlipHeartbeats)
import Hydra.Persistence (
Persistence (..),
PersistenceIncremental (..),
createPersistence,
createPersistenceIncremental,
)
import System.Directory (doesFileExist)
import System.FilePath ((</>))
import System.Random (mkStdGen, uniformR)
import Test.Hydra.Tx.Fixture (alice, bob, carol)
import Test.QuickCheck (
Positive (Positive),
collect,
counterexample,
generate,
tabulate,
within,
(===),
)
import Prelude (unlines)
spec :: Spec
spec :: Spec
spec = Spec -> Spec
forall a. SpecWith a -> SpecWith a
parallel (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$ do
let captureOutgoing :: TVar m (Vector msg) -> p -> (Network m msg -> b) -> b
captureOutgoing TVar m (Vector msg)
msgqueue p
_callback Network m msg -> b
action =
Network m msg -> b
action (Network m msg -> b) -> Network m msg -> b
forall a b. (a -> b) -> a -> b
$ Network{$sel:broadcast:Network :: msg -> m ()
broadcast = \msg
msg -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector msg) -> (Vector msg -> Vector msg) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (Vector msg)
msgqueue (Vector msg -> msg -> Vector msg
forall a. Vector a -> a -> Vector a
`snoc` msg
msg)}
let msg' :: Int
msg' = Int
42 :: Int
Heartbeat String
msg <- NodeId -> String -> Heartbeat String
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" (String -> Heartbeat String)
-> SpecM () String -> SpecM () (Heartbeat String)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO String -> SpecM () String
forall r a. IO r -> SpecM a r
runIO (forall a. Gen a -> IO a
generate @String Gen String
forall a. Arbitrary a => Gen a
arbitrary)
String -> Spec -> Spec
forall a. HasCallStack => String -> SpecWith a -> SpecWith a
describe String
"receiving messages" (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$ do
String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"forward received messages" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
let propagatedMessages :: [Authenticated (Heartbeat (Heartbeat String))]
propagatedMessages =
[Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))]
-> [Authenticated (Heartbeat (Heartbeat String))]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages
[ReliableMsg (Heartbeat (Heartbeat String))
-> Party
-> Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg)) Party
bob]
[Authenticated (Heartbeat (Heartbeat String))]
propagatedMessages [Authenticated (Heartbeat (Heartbeat String))]
-> [Authenticated (Heartbeat (Heartbeat String))] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Heartbeat (Heartbeat String)
-> Party -> Authenticated (Heartbeat (Heartbeat String))
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg) Party
bob]
String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"do not drop messages with same ids from different peers" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
let propagatedMessages :: [Authenticated (Heartbeat Int)]
propagatedMessages =
[Authenticated (ReliableMsg (Heartbeat Int))]
-> [Authenticated (Heartbeat Int)]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages
[ ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
1, Int
0]) (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
msg')) Party
bob
, ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
0, Int
1]) (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg')) Party
carol
]
[Authenticated (Heartbeat Int)]
propagatedMessages [Authenticated (Heartbeat Int)]
-> [Authenticated (Heartbeat Int)] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Heartbeat Int -> Party -> Authenticated (Heartbeat Int)
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
msg') Party
bob, Heartbeat Int -> Party -> Authenticated (Heartbeat Int)
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg') Party
carol]
String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"Ignores messages with malformed acks" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
let malFormedAck :: Vector Int
malFormedAck = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
0]
wellFormedAck :: Vector Int
wellFormedAck = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
0, Int
1]
propagatedMessages :: [Authenticated (Heartbeat Int)]
propagatedMessages =
[Authenticated (ReliableMsg (Heartbeat Int))]
-> [Authenticated (Heartbeat Int)]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages
[ ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
malFormedAck (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
msg')) Party
bob
, ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
wellFormedAck (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg')) Party
carol
]
[Authenticated (Heartbeat Int)]
propagatedMessages [Authenticated (Heartbeat Int)]
-> [Authenticated (Heartbeat Int)] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Heartbeat Int -> Party -> Authenticated (Heartbeat Int)
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg') Party
carol]
String -> ([Positive Int] -> Property) -> Spec
forall prop.
(HasCallStack, Testable prop) =>
String -> prop -> Spec
prop String
"drops already received messages" (([Positive Int] -> Property) -> Spec)
-> ([Positive Int] -> Property) -> Spec
forall a b. (a -> b) -> a -> b
$ \([Positive Int]
messages :: [Positive Int]) ->
let
messagesToSend :: [Authenticated (ReliableMsg (Heartbeat Int))]
messagesToSend =
(\(Positive Int
m) -> ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
m, Int
0]) (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
m)) Party
bob)
(Positive Int -> Authenticated (ReliableMsg (Heartbeat Int)))
-> [Positive Int] -> [Authenticated (ReliableMsg (Heartbeat Int))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Positive Int]
messages
propagatedMessages :: [Authenticated (Heartbeat Int)]
propagatedMessages = [Authenticated (ReliableMsg (Heartbeat Int))]
-> [Authenticated (Heartbeat Int)]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages [Authenticated (ReliableMsg (Heartbeat Int))]
messagesToSend
receivedMessagesInOrder :: t (Authenticated (Heartbeat a)) -> Bool
receivedMessagesInOrder t (Authenticated (Heartbeat a))
messageReceived =
let refMessages :: [Heartbeat a]
refMessages = NodeId -> a -> Heartbeat a
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" (a -> Heartbeat a) -> [a] -> [Heartbeat a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [a
1 ..]
isInMessage :: Authenticated (Heartbeat a) -> Bool
isInMessage Authenticated{Heartbeat a
payload :: Heartbeat a
$sel:payload:Authenticated :: forall msg. Authenticated msg -> msg
payload} = Heartbeat a
payload Heartbeat a -> [Heartbeat a] -> Bool
forall (f :: * -> *) a.
(Foldable f, DisallowElem f, Eq a) =>
a -> f a -> Bool
`elem` [Heartbeat a]
refMessages
in (Authenticated (Heartbeat a) -> Bool)
-> t (Authenticated (Heartbeat a)) -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all Authenticated (Heartbeat a) -> Bool
isInMessage t (Authenticated (Heartbeat a))
messageReceived
in
[Authenticated (Heartbeat Int)] -> Bool
forall {a} {t :: * -> *}.
(Eq a, Foldable t, Num a, Enum a) =>
t (Authenticated (Heartbeat a)) -> Bool
receivedMessagesInOrder [Authenticated (Heartbeat Int)]
propagatedMessages
Bool -> (Bool -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample ([Authenticated (Heartbeat Int)] -> String
forall b a. (Show a, IsString b) => a -> b
show [Authenticated (Heartbeat Int)]
propagatedMessages)
Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& Int -> Property -> Property
forall a prop. (Show a, Testable prop) => a -> prop -> Property
collect ([Authenticated (Heartbeat Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Authenticated (Heartbeat Int)]
propagatedMessages)
String -> Spec -> Spec
forall a. HasCallStack => String -> SpecWith a -> SpecWith a
describe String
"sending messages" (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$ do
String -> ([String] -> Expectation) -> Spec
forall prop.
(HasCallStack, Testable prop) =>
String -> prop -> Spec
prop String
"broadcast messages to the network assigning a sequential id" (([String] -> Expectation) -> Spec)
-> ([String] -> Expectation) -> Spec
forall a b. (a -> b) -> a -> b
$ \([String]
messages :: [String]) ->
let sentMsgs :: Vector (ReliableMsg (Heartbeat String))
sentMsgs = (forall s. IOSim s (Vector (ReliableMsg (Heartbeat String))))
-> Vector (ReliableMsg (Heartbeat String))
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s (Vector (ReliableMsg (Heartbeat String))))
-> Vector (ReliableMsg (Heartbeat String)))
-> (forall s. IOSim s (Vector (ReliableMsg (Heartbeat String))))
-> Vector (ReliableMsg (Heartbeat String))
forall a b. (a -> b) -> a -> b
$ do
TVar s (Vector (ReliableMsg (Heartbeat String)))
sentMessages <- Vector (ReliableMsg (Heartbeat String))
-> IOSim
s (TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String))))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (ReliableMsg (Heartbeat String))
forall a. Vector a
empty
MessagePersistence (IOSim s) String
persistence <- Int
-> MonadSTM (IOSim s) =>
IOSim s (MessagePersistence (IOSim s) String)
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
1
Tracer (IOSim s) ReliabilityLog
-> MessagePersistence (IOSim s) String
-> Party
-> [Party]
-> NetworkComponent
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat Any)))
(ReliableMsg (Heartbeat String))
()
-> NetworkComponent
(IOSim s) (Authenticated (Heartbeat Any)) (Heartbeat String) ()
forall (m :: * -> *) outbound inbound a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m outbound
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer MessagePersistence (IOSim s) String
persistence Party
alice [] (TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
-> NetworkComponent
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat Any)))
(ReliableMsg (Heartbeat String))
()
forall {m :: * -> *} {msg} {p} {b}.
MonadSTM m =>
TVar m (Vector msg) -> p -> (Network m msg -> b) -> b
captureOutgoing TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
TVar s (Vector (ReliableMsg (Heartbeat String)))
sentMessages) NetworkCallback (Authenticated (Heartbeat Any)) (IOSim s)
forall (m :: * -> *) b. Monad m => NetworkCallback b m
noop ((Network (IOSim s) (Heartbeat String) -> IOSim s ())
-> IOSim s ())
-> (Network (IOSim s) (Heartbeat String) -> IOSim s ())
-> IOSim s ()
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat String -> IOSim s ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat String -> IOSim s ()
broadcast} -> do
(String -> IOSim s ()) -> [String] -> IOSim s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Heartbeat String -> IOSim s ()
broadcast (Heartbeat String -> IOSim s ())
-> (String -> Heartbeat String) -> String -> IOSim s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> String -> Heartbeat String
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1") [String]
messages
[ReliableMsg (Heartbeat String)]
-> Vector (ReliableMsg (Heartbeat String))
forall a. [a] -> Vector a
fromList ([ReliableMsg (Heartbeat String)]
-> Vector (ReliableMsg (Heartbeat String)))
-> (Vector (ReliableMsg (Heartbeat String))
-> [ReliableMsg (Heartbeat String)])
-> Vector (ReliableMsg (Heartbeat String))
-> Vector (ReliableMsg (Heartbeat String))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector (ReliableMsg (Heartbeat String))
-> [ReliableMsg (Heartbeat String)]
forall a. Vector a -> [a]
Vector.toList (Vector (ReliableMsg (Heartbeat String))
-> Vector (ReliableMsg (Heartbeat String)))
-> IOSim s (Vector (ReliableMsg (Heartbeat String)))
-> IOSim s (Vector (ReliableMsg (Heartbeat String)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
-> IOSim s (Vector (ReliableMsg (Heartbeat String)))
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
TVar s (Vector (ReliableMsg (Heartbeat String)))
sentMessages
in Vector Int -> Int
forall a. Vector a -> a
head (Vector Int -> Int)
-> (ReliableMsg (Heartbeat String) -> Vector Int)
-> ReliableMsg (Heartbeat String)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReliableMsg (Heartbeat String) -> Vector Int
forall msg. ReliableMsg msg -> Vector Int
knownMessageIds (ReliableMsg (Heartbeat String) -> Int)
-> Vector (ReliableMsg (Heartbeat String)) -> Vector Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (ReliableMsg (Heartbeat String))
sentMsgs Vector Int -> Vector Int -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1 .. ([String] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [String]
messages)]
(Int -> Int) -> Spec -> Spec
forall a. (Int -> Int) -> SpecWith a -> SpecWith a
modifyMaxSuccess (Int -> Int -> Int
forall a b. a -> b -> a
const Int
5000) (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$
String -> ([Int] -> [Int] -> Int -> Property) -> Spec
forall prop.
(HasCallStack, Testable prop) =>
String -> prop -> Spec
prop String
"stress test networking layer" (([Int] -> [Int] -> Int -> Property) -> Spec)
-> ([Int] -> [Int] -> Int -> Property) -> Spec
forall a b. (a -> b) -> a -> b
$ \([Int]
aliceToBobMessages :: [Int]) ([Int]
bobToAliceMessages :: [Int]) Int
seed ->
let
([Int]
msgReceivedByAlice, [Int]
msgReceivedByBob, [ReliabilityLog]
traces) = (forall s. IOSim s ([Int], [Int], [ReliabilityLog]))
-> ([Int], [Int], [ReliabilityLog])
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s ([Int], [Int], [ReliabilityLog]))
-> ([Int], [Int], [ReliabilityLog]))
-> (forall s. IOSim s ([Int], [Int], [ReliabilityLog]))
-> ([Int], [Int], [ReliabilityLog])
forall a b. (a -> b) -> a -> b
$ do
TVar s (Vector Int)
messagesReceivedByBob <- Vector Int -> IOSim s (TVar (IOSim s) (Vector Int))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector Int
forall a. Vector a
empty
TVar s (Vector Int)
messagesReceivedByAlice <- Vector Int -> IOSim s (TVar (IOSim s) (Vector Int))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector Int
forall a. Vector a
empty
TVar s [ReliabilityLog]
emittedTraces <- [ReliabilityLog] -> IOSim s (TVar (IOSim s) [ReliabilityLog])
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO []
TVar s StdGen
randomSeed <- StdGen -> IOSim s (TVar (IOSim s) StdGen)
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (StdGen -> IOSim s (TVar (IOSim s) StdGen))
-> StdGen -> IOSim s (TVar (IOSim s) StdGen)
forall a b. (a -> b) -> a -> b
$ Int -> StdGen
mkStdGen Int
seed
TQueueDefault
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
aliceToBob <- IOSim
s
(TQueue
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
IOSim
s
(TQueueDefault
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
forall a. IOSim s (TQueue (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => m (TQueue m a)
newTQueueIO
TQueueDefault
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
bobToAlice <- IOSim
s
(TQueue
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
IOSim
s
(TQueueDefault
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
forall a. IOSim s (TQueue (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => m (TQueue m a)
newTQueueIO
MessagePersistence (IOSim s) (Heartbeat Int)
alicePersistence <- Int
-> MonadSTM (IOSim s) =>
IOSim s (MessagePersistence (IOSim s) (Heartbeat Int))
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
2
MessagePersistence (IOSim s) (Heartbeat Int)
bobPersistence <- Int
-> MonadSTM (IOSim s) =>
IOSim s (MessagePersistence (IOSim s) (Heartbeat Int))
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
2
let
aliceFailingNetwork :: NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ()
aliceFailingNetwork = TVar (IOSim s) StdGen
-> Party
-> (TQueue
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))),
TQueue
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
-> NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {a} {msg} {msg} {b}.
(MonadAsync m, MonadSTM m, RandomGen a) =>
TVar m a
-> Party
-> (TQueue m msg, TQueue m (Authenticated msg))
-> NetworkCallback msg m
-> (Network m msg -> m b)
-> m b
failingNetwork TVar (IOSim s) StdGen
TVar s StdGen
randomSeed Party
alice (TQueue
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
bobToAlice, TQueue
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
aliceToBob)
bobFailingNetwork :: NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ()
bobFailingNetwork = TVar (IOSim s) StdGen
-> Party
-> (TQueue
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))),
TQueue
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
-> NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {a} {msg} {msg} {b}.
(MonadAsync m, MonadSTM m, RandomGen a) =>
TVar m a
-> Party
-> (TQueue m msg, TQueue m (Authenticated msg))
-> NetworkCallback msg m
-> (Network m msg -> m b)
-> m b
failingNetwork TVar (IOSim s) StdGen
TVar s StdGen
randomSeed Party
bob (TQueue
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
aliceToBob, TQueue
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
(IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
bobToAlice)
bobReliabilityStack :: NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
bobReliabilityStack = MessagePersistence (IOSim s) (Heartbeat Int)
-> (NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ())
-> Tracer (IOSim s) ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
forall {m :: * -> *} {outbound} {inbound} {a}.
(MonadAsync m, MonadDelay m, MonadThrow m, MonadThrow (STM m)) =>
MessagePersistence m outbound
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a
reliabilityStack MessagePersistence (IOSim s) (Heartbeat Int)
bobPersistence NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ()
bobFailingNetwork (TVar (IOSim s) [ReliabilityLog] -> Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *).
MonadSTM m =>
TVar m [ReliabilityLog] -> Tracer m ReliabilityLog
captureTraces TVar (IOSim s) [ReliabilityLog]
TVar s [ReliabilityLog]
emittedTraces) NodeId
"bob" Party
bob [Party
alice]
aliceReliabilityStack :: NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
aliceReliabilityStack = MessagePersistence (IOSim s) (Heartbeat Int)
-> (NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ())
-> Tracer (IOSim s) ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
forall {m :: * -> *} {outbound} {inbound} {a}.
(MonadAsync m, MonadDelay m, MonadThrow m, MonadThrow (STM m)) =>
MessagePersistence m outbound
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a
reliabilityStack MessagePersistence (IOSim s) (Heartbeat Int)
alicePersistence NetworkCallback
(Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
-> IOSim s ())
-> IOSim s ()
aliceFailingNetwork (TVar (IOSim s) [ReliabilityLog] -> Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *).
MonadSTM m =>
TVar m [ReliabilityLog] -> Tracer m ReliabilityLog
captureTraces TVar (IOSim s) [ReliabilityLog]
TVar s [ReliabilityLog]
emittedTraces) NodeId
"alice" Party
alice [Party
bob]
runAlice :: IOSim s ()
runAlice = NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
-> NodeId
-> TVar (IOSim s) (Vector Int)
-> TVar (IOSim s) (Vector Int)
-> [Int]
-> [Int]
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {msg} {msg} {b}.
(TVar m ~ TVar m, MonadDelay m, MonadAsync m, MonadSTM m) =>
(NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
-> (Network m (Heartbeat msg) -> m ()) -> b)
-> NodeId
-> TVar m (Vector msg)
-> TVar m (Vector msg)
-> [msg]
-> [msg]
-> b
runPeer NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
aliceReliabilityStack NodeId
"alice" TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByAlice TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByBob [Int]
aliceToBobMessages [Int]
bobToAliceMessages
runBob :: IOSim s ()
runBob = NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
-> NodeId
-> TVar (IOSim s) (Vector Int)
-> TVar (IOSim s) (Vector Int)
-> [Int]
-> [Int]
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {msg} {msg} {b}.
(TVar m ~ TVar m, MonadDelay m, MonadAsync m, MonadSTM m) =>
(NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
-> (Network m (Heartbeat msg) -> m ()) -> b)
-> NodeId
-> TVar m (Vector msg)
-> TVar m (Vector msg)
-> [msg]
-> [msg]
-> b
runPeer NetworkComponent
(IOSim s)
(Either Connectivity (Authenticated (Heartbeat Int)))
(Heartbeat Int)
()
bobReliabilityStack NodeId
"bob" TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByBob TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByAlice [Int]
bobToAliceMessages [Int]
aliceToBobMessages
IOSim s () -> IOSim s () -> IOSim s ()
forall a b. IOSim s a -> IOSim s b -> IOSim s ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_ IOSim s ()
runAlice IOSim s ()
runBob
[ReliabilityLog]
logs <- TVar (IOSim s) [ReliabilityLog] -> IOSim s [ReliabilityLog]
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) [ReliabilityLog]
TVar s [ReliabilityLog]
emittedTraces
[Int]
aliceReceived <- Vector Int -> [Int]
forall a. Vector a -> [a]
Vector.toList (Vector Int -> [Int]) -> IOSim s (Vector Int) -> IOSim s [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector Int) -> IOSim s (Vector Int)
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByAlice
[Int]
bobReceived <- Vector Int -> [Int]
forall a. Vector a -> [a]
Vector.toList (Vector Int -> [Int]) -> IOSim s (Vector Int) -> IOSim s [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector Int) -> IOSim s (Vector Int)
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByBob
([Int], [Int], [ReliabilityLog])
-> IOSim s ([Int], [Int], [ReliabilityLog])
forall a. a -> IOSim s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Int]
aliceReceived, [Int]
bobReceived, [ReliabilityLog]
logs)
in
Int -> Property -> Property
forall prop. Testable prop => Int -> prop -> Property
within Int
1000000 (Property -> Property) -> Property -> Property
forall a b. (a -> b) -> a -> b
$
[Int]
msgReceivedByBob
[Int] -> [Int] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== [Int]
aliceToBobMessages
Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> Property -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample ([String] -> String
unlines ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ ReliabilityLog -> String
forall b a. (Show a, IsString b) => a -> b
show (ReliabilityLog -> String) -> [ReliabilityLog] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ReliabilityLog] -> [ReliabilityLog]
forall a. [a] -> [a]
reverse [ReliabilityLog]
traces)
Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> [String] -> Property -> Property
forall prop.
Testable prop =>
String -> [String] -> prop -> Property
tabulate String
"Messages from Alice to Bob" [String
"< " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall b a. (Show a, IsString b) => a -> b
show (([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
msgReceivedByBob Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10)]
Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> [String] -> Property -> Property
forall prop.
Testable prop =>
String -> [String] -> prop -> Property
tabulate String
"Messages from Bob to Alice" [String
"< " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall b a. (Show a, IsString b) => a -> b
show (([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
msgReceivedByAlice Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10)]
String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"broadcast updates counter from peers" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
let receivedMsgs :: [ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs = (forall s. IOSim s [ReliableMsg (Heartbeat (Heartbeat String))])
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s [ReliableMsg (Heartbeat (Heartbeat String))])
-> [ReliableMsg (Heartbeat (Heartbeat String))])
-> (forall s. IOSim s [ReliableMsg (Heartbeat (Heartbeat String))])
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a b. (a -> b) -> a -> b
$ do
TVar s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages <- Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim
s
(TVar
(IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String)))))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a
empty
MessagePersistence (IOSim s) (Heartbeat String)
alicePersistence <- Int
-> MonadSTM (IOSim s) =>
IOSim s (MessagePersistence (IOSim s) (Heartbeat String))
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
2
Tracer (IOSim s) ReliabilityLog
-> MessagePersistence (IOSim s) (Heartbeat String)
-> Party
-> [Party]
-> NetworkComponent
(IOSim s)
(Authenticated (ReliableMsg (Heartbeat (Heartbeat String))))
(ReliableMsg (Heartbeat (Heartbeat String)))
()
-> NetworkComponent
(IOSim s)
(Authenticated (Heartbeat (Heartbeat String)))
(Heartbeat (Heartbeat String))
()
forall (m :: * -> *) outbound inbound a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m outbound
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability
Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
MessagePersistence (IOSim s) (Heartbeat String)
alicePersistence
Party
alice
[Party
bob]
( \NetworkCallback{Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
deliver :: Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
action -> do
IOSim s () -> IOSim s () -> IOSim s ()
forall a b. IOSim s a -> IOSim s b -> IOSim s ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_
(Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
action (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ())
-> Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
forall a b. (a -> b) -> a -> b
$ Network{$sel:broadcast:Network :: ReliableMsg (Heartbeat (Heartbeat String)) -> IOSim s ()
broadcast = \ReliableMsg (Heartbeat (Heartbeat String))
m -> STM (IOSim s) () -> IOSim s ()
forall a. HasCallStack => STM (IOSim s) a -> IOSim s a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM (IOSim s) () -> IOSim s ()) -> STM (IOSim s) () -> IOSim s ()
forall a b. (a -> b) -> a -> b
$ TVar
(IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> STM (IOSim s) ()
forall a. TVar (IOSim s) a -> (a -> a) -> STM (IOSim s) ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar
(IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> ReliableMsg (Heartbeat (Heartbeat String))
-> Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a -> a -> Vector a
`snoc` ReliableMsg (Heartbeat (Heartbeat String))
m)})
(Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
deliver (ReliableMsg (Heartbeat (Heartbeat String))
-> Party
-> Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg)) Party
bob))
)
NetworkCallback
(Authenticated (Heartbeat (Heartbeat String))) (IOSim s)
forall (m :: * -> *) b. Monad m => NetworkCallback b m
noop
((Network (IOSim s) (Heartbeat (Heartbeat String)) -> IOSim s ())
-> IOSim s ())
-> (Network (IOSim s) (Heartbeat (Heartbeat String)) -> IOSim s ())
-> IOSim s ()
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat (Heartbeat String) -> IOSim s ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat (Heartbeat String) -> IOSim s ()
broadcast} -> do
DiffTime -> IOSim s ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
Heartbeat (Heartbeat String) -> IOSim s ()
broadcast (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)
Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a. Vector a -> [a]
Vector.toList (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> [ReliableMsg (Heartbeat (Heartbeat String))])
-> IOSim s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IOSim s [ReliableMsg (Heartbeat (Heartbeat String))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar
(IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IOSim s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar
(IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages
[ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs [ReliableMsg (Heartbeat (Heartbeat String))]
-> [ReliableMsg (Heartbeat (Heartbeat String))] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)]
String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"appends messages to disk and can load them back" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
String -> (String -> Expectation) -> Expectation
forall (m :: * -> *) r.
MonadIO m =>
String -> (String -> m r) -> m r
withTempDir String
"network-messages-persistence" ((String -> Expectation) -> Expectation)
-> (String -> Expectation) -> Expectation
forall a b. (a -> b) -> a -> b
$ \String
tmpDir -> do
let networkMessagesFile :: String
networkMessagesFile = String
tmpDir String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"/network-messages"
Persistence{FromJSON (Vector Int) => IO (Maybe (Vector Int))
load :: FromJSON (Vector Int) => IO (Maybe (Vector Int))
$sel:load:Persistence :: forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load, ToJSON (Vector Int) => Vector Int -> Expectation
save :: ToJSON (Vector Int) => Vector Int -> Expectation
$sel:save:Persistence :: forall a (m :: * -> *). Persistence a m -> ToJSON a => a -> m ()
save} <- String -> IO (Persistence (Vector Int) IO)
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
String -> m (Persistence a m)
createPersistence (String -> IO (Persistence (Vector Int) IO))
-> String -> IO (Persistence (Vector Int) IO)
forall a b. (a -> b) -> a -> b
$ String
tmpDir String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"/acks"
PersistenceIncremental{FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll :: FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll, ToJSON (Heartbeat (Heartbeat String)) =>
Heartbeat (Heartbeat String) -> Expectation
append :: ToJSON (Heartbeat (Heartbeat String)) =>
Heartbeat (Heartbeat String) -> Expectation
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append} <- String
-> IO (PersistenceIncremental (Heartbeat (Heartbeat String)) IO)
forall a (m :: * -> *).
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m,
MonadThrow (STM m)) =>
String -> m (PersistenceIncremental a m)
createPersistenceIncremental String
networkMessagesFile
let messagePersistence :: MessagePersistence IO (Heartbeat String)
messagePersistence =
MessagePersistence
{ $sel:loadAcks:MessagePersistence :: IO (Vector Int)
loadAcks = do
Maybe (Vector Int)
mloaded <- IO (Maybe (Vector Int))
FromJSON (Vector Int) => IO (Maybe (Vector Int))
load
case Maybe (Vector Int)
mloaded of
Maybe (Vector Int)
Nothing -> Vector Int -> IO (Vector Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Vector Int -> IO (Vector Int)) -> Vector Int -> IO (Vector Int)
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Int
forall a. Int -> a -> Vector a
replicate ([Party] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Party
alice, Party
bob]) Int
0
Just Vector Int
acks -> Vector Int -> IO (Vector Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
acks
, $sel:saveAcks:MessagePersistence :: Vector Int -> Expectation
saveAcks = ToJSON (Vector Int) => Vector Int -> Expectation
Vector Int -> Expectation
save
, $sel:loadMessages:MessagePersistence :: IO [Heartbeat (Heartbeat String)]
loadMessages = IO [Heartbeat (Heartbeat String)]
FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll
, $sel:appendMessage:MessagePersistence :: Heartbeat (Heartbeat String) -> Expectation
appendMessage = ToJSON (Heartbeat (Heartbeat String)) =>
Heartbeat (Heartbeat String) -> Expectation
Heartbeat (Heartbeat String) -> Expectation
append
}
[ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs <- do
TVar (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages <- Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> IO
(TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String)))))
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a
empty
Tracer IO ReliabilityLog
-> MessagePersistence IO (Heartbeat String)
-> Party
-> [Party]
-> NetworkComponent
IO
(Authenticated (ReliableMsg (Heartbeat (Heartbeat String))))
(ReliableMsg (Heartbeat (Heartbeat String)))
()
-> NetworkComponent
IO
(Authenticated (Heartbeat (Heartbeat String)))
(Heartbeat (Heartbeat String))
()
forall (m :: * -> *) outbound inbound a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m outbound
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability
Tracer IO ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
MessagePersistence IO (Heartbeat String)
messagePersistence
Party
alice
[Party
bob]
( \NetworkCallback{Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver :: Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
deliver} Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
action -> do
Expectation -> Expectation -> Expectation
forall a b. IO a -> IO b -> Expectation
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_
(Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
action (Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation)
-> Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
forall a b. (a -> b) -> a -> b
$ Network{$sel:broadcast:Network :: ReliableMsg (Heartbeat (Heartbeat String)) -> Expectation
broadcast = \ReliableMsg (Heartbeat (Heartbeat String))
m -> STM IO () -> Expectation
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> Expectation) -> STM IO () -> Expectation
forall a b. (a -> b) -> a -> b
$ TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> STM IO ()
forall a. TVar IO a -> (a -> a) -> STM IO ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> ReliableMsg (Heartbeat (Heartbeat String))
-> Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a -> a -> Vector a
`snoc` ReliableMsg (Heartbeat (Heartbeat String))
m)})
(Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
deliver (ReliableMsg (Heartbeat (Heartbeat String))
-> Party
-> Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg)) Party
bob))
)
NetworkCallback (Authenticated (Heartbeat (Heartbeat String))) IO
forall (m :: * -> *) b. Monad m => NetworkCallback b m
noop
((Network IO (Heartbeat (Heartbeat String)) -> Expectation)
-> Expectation)
-> (Network IO (Heartbeat (Heartbeat String)) -> Expectation)
-> Expectation
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat (Heartbeat String) -> Expectation
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat (Heartbeat String) -> Expectation
broadcast} -> do
DiffTime -> Expectation
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
Heartbeat (Heartbeat String) -> Expectation
broadcast (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)
Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a. Vector a -> [a]
Vector.toList (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> [ReliableMsg (Heartbeat (Heartbeat String))])
-> IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IO [ReliableMsg (Heartbeat (Heartbeat String))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
forall a. TVar IO a -> IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages
[ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs [ReliableMsg (Heartbeat (Heartbeat String))]
-> [ReliableMsg (Heartbeat (Heartbeat String))] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)]
String -> IO Bool
doesFileExist String
networkMessagesFile IO Bool -> Bool -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` Bool
True
String -> IO [Heartbeat (Heartbeat String)]
reloadAll String
networkMessagesFile IO [Heartbeat (Heartbeat String)]
-> [Heartbeat (Heartbeat String)] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` [NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg]
String -> IO Bool
doesFileExist (String
tmpDir String -> String -> String
</> String
"acks") IO Bool -> Bool -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` Bool
True
IO (Maybe (Vector Int))
FromJSON (Vector Int) => IO (Maybe (Vector Int))
load IO (Maybe (Vector Int)) -> Maybe (Vector Int) -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` Vector Int -> Maybe (Vector Int)
forall a. a -> Maybe a
Just ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1])
where
runPeer :: (NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
-> (Network m (Heartbeat msg) -> m ()) -> b)
-> NodeId
-> TVar m (Vector msg)
-> TVar m (Vector msg)
-> [msg]
-> [msg]
-> b
runPeer NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
-> (Network m (Heartbeat msg) -> m ()) -> b
reliability NodeId
partyName TVar m (Vector msg)
receivedMessageContainer TVar m (Vector msg)
sentMessageContainer [msg]
messagesToSend [msg]
expectedMessages =
NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
-> (Network m (Heartbeat msg) -> m ()) -> b
reliability (TVar m (Vector msg)
-> NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
forall (m :: * -> *) msg.
MonadSTM m =>
TVar m (Vector msg)
-> NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
capturePayload TVar m (Vector msg)
receivedMessageContainer) ((Network m (Heartbeat msg) -> m ()) -> b)
-> (Network m (Heartbeat msg) -> m ()) -> b
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat msg -> m ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat msg -> m ()
broadcast} -> do
[msg] -> (msg -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [msg]
messagesToSend ((msg -> m ()) -> m ()) -> (msg -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \msg
m -> do
Heartbeat msg -> m ()
broadcast (NodeId -> msg -> Heartbeat msg
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
partyName msg
m)
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
0.1
m () -> m () -> m ()
forall a b. m a -> m b -> m ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_
([msg] -> TVar m (Vector msg) -> m ()
forall (m :: * -> *) msg.
MonadSTM m =>
[msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages [msg]
expectedMessages TVar m (Vector msg)
TVar m (Vector msg)
receivedMessageContainer)
([msg] -> TVar m (Vector msg) -> m ()
forall (m :: * -> *) msg.
MonadSTM m =>
[msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages [msg]
messagesToSend TVar m (Vector msg)
sentMessageContainer)
reliabilityStack :: MessagePersistence m outbound
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a
reliabilityStack MessagePersistence m outbound
persistence NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
underlyingNetwork Tracer m ReliabilityLog
tracer NodeId
nodeId Party
party [Party]
peers =
NodeId
-> NetworkComponent
m (Heartbeat (Authenticated inbound)) (Heartbeat outbound) a
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a
forall (m :: * -> *) inbound outbound a.
(MonadAsync m, MonadDelay m) =>
NodeId
-> NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a
-> NetworkComponent m (Either Connectivity inbound) outbound a
withHeartbeat NodeId
nodeId (NetworkComponent
m (Heartbeat (Authenticated inbound)) (Heartbeat outbound) a
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a)
-> NetworkComponent
m (Heartbeat (Authenticated inbound)) (Heartbeat outbound) a
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a
forall a b. (a -> b) -> a -> b
$
NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
-> NetworkComponent
m (Heartbeat (Authenticated inbound)) (Heartbeat outbound) a
forall (m :: * -> *) inbound outbound a.
NetworkComponent m (Authenticated (Heartbeat inbound)) outbound a
-> NetworkComponent
m (Heartbeat (Authenticated inbound)) outbound a
withFlipHeartbeats (NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
-> NetworkComponent
m (Heartbeat (Authenticated inbound)) (Heartbeat outbound) a)
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
-> NetworkComponent
m (Heartbeat (Authenticated inbound)) (Heartbeat outbound) a
forall a b. (a -> b) -> a -> b
$
Tracer m ReliabilityLog
-> MessagePersistence m outbound
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
forall (m :: * -> *) outbound inbound a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m outbound
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability Tracer m ReliabilityLog
tracer MessagePersistence m outbound
persistence Party
party [Party]
peers NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
underlyingNetwork
failingNetwork :: TVar m a
-> Party
-> (TQueue m msg, TQueue m (Authenticated msg))
-> NetworkCallback msg m
-> (Network m msg -> m b)
-> m b
failingNetwork TVar m a
seed Party
peer (TQueue m msg
readQueue, TQueue m (Authenticated msg)
writeQueue) NetworkCallback{msg -> m ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver :: msg -> m ()
deliver} Network m msg -> m b
action =
m Any -> (Async m Any -> m b) -> m b
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync
( m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ do
msg
newMsg <- STM m msg -> m msg
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m msg -> m msg) -> STM m msg -> m msg
forall a b. (a -> b) -> a -> b
$ TQueue m msg -> STM m msg
forall a. TQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m msg
readQueue
msg -> m ()
deliver msg
newMsg
)
((Async m Any -> m b) -> m b) -> (Async m Any -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m Any
_ ->
Network m msg -> m b
action (Network m msg -> m b) -> Network m msg -> m b
forall a b. (a -> b) -> a -> b
$
Network
{ $sel:broadcast:Network :: msg -> m ()
broadcast = \msg
m -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Double
r <- TVar m a -> STM m Double
forall {m :: * -> *} {a}.
(MonadSTM m, RandomGen a) =>
TVar m a -> STM m Double
randomNumber TVar m a
seed
Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0.02) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$ TQueue m (Authenticated msg) -> Authenticated msg -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (Authenticated msg)
writeQueue (msg -> Party -> Authenticated msg
forall msg. msg -> Party -> Authenticated msg
Authenticated msg
m Party
peer)
}
randomNumber :: TVar m a -> STM m Double
randomNumber TVar m a
seed' = do
a
genSeed <- TVar m a -> STM m a
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m a
seed'
let (Double
res, a
newGenSeed) = (Double, Double) -> a -> (Double, a)
forall g a. (RandomGen g, UniformRange a) => (a, a) -> g -> (a, g)
uniformR (Double
0 :: Double, Double
1) a
genSeed
TVar m a -> a -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m a
seed' a
newGenSeed
Double -> STM m Double
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Double
res
reloadAll :: FilePath -> IO [Heartbeat (Heartbeat String)]
reloadAll :: String -> IO [Heartbeat (Heartbeat String)]
reloadAll String
fileName =
String
-> IO (PersistenceIncremental (Heartbeat (Heartbeat String)) IO)
forall a (m :: * -> *).
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m,
MonadThrow (STM m)) =>
String -> m (PersistenceIncremental a m)
createPersistenceIncremental String
fileName
IO (PersistenceIncremental (Heartbeat (Heartbeat String)) IO)
-> (PersistenceIncremental (Heartbeat (Heartbeat String)) IO
-> IO [Heartbeat (Heartbeat String)])
-> IO [Heartbeat (Heartbeat String)]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \PersistenceIncremental{FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll :: FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll} -> IO [Heartbeat (Heartbeat String)]
FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll
noop :: Monad m => NetworkCallback b m
noop :: forall (m :: * -> *) b. Monad m => NetworkCallback b m
noop = NetworkCallback{$sel:deliver:NetworkCallback :: b -> m ()
deliver = m () -> b -> m ()
forall a b. a -> b -> a
const (m () -> b -> m ()) -> m () -> b -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()}
aliceReceivesMessages :: [Authenticated (ReliableMsg (Heartbeat msg))] -> [Authenticated (Heartbeat msg)]
aliceReceivesMessages :: forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages [Authenticated (ReliableMsg (Heartbeat msg))]
messages = (forall s. IOSim s [Authenticated (Heartbeat msg)])
-> [Authenticated (Heartbeat msg)]
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s [Authenticated (Heartbeat msg)])
-> [Authenticated (Heartbeat msg)])
-> (forall s. IOSim s [Authenticated (Heartbeat msg)])
-> [Authenticated (Heartbeat msg)]
forall a b. (a -> b) -> a -> b
$ do
TVar s (Vector (Authenticated (Heartbeat msg)))
receivedMessages <- Vector (Authenticated (Heartbeat msg))
-> IOSim
s (TVar (IOSim s) (Vector (Authenticated (Heartbeat msg))))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (Authenticated (Heartbeat msg))
forall a. Vector a
empty
MessagePersistence (IOSim s) Any
alicePersistence <- Int
-> MonadSTM (IOSim s) => IOSim s (MessagePersistence (IOSim s) Any)
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
3
let baseNetwork :: NetworkCallback
(Authenticated (ReliableMsg (Heartbeat msg))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat Any))
-> IOSim s [()])
-> IOSim s [()]
baseNetwork NetworkCallback{Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver :: Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ()
deliver} Network (IOSim s) (ReliableMsg (Heartbeat Any)) -> IOSim s [()]
_ = (Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ())
-> [Authenticated (ReliableMsg (Heartbeat msg))] -> IOSim s [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ()
deliver [Authenticated (ReliableMsg (Heartbeat msg))]
messages
aliceReliabilityStack :: NetworkComponent
(IOSim s) (Authenticated (Heartbeat msg)) (Heartbeat Any) [()]
aliceReliabilityStack =
Tracer (IOSim s) ReliabilityLog
-> MessagePersistence (IOSim s) Any
-> Party
-> [Party]
-> (NetworkCallback
(Authenticated (ReliableMsg (Heartbeat msg))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat Any))
-> IOSim s [()])
-> IOSim s [()])
-> NetworkComponent
(IOSim s) (Authenticated (Heartbeat msg)) (Heartbeat Any) [()]
forall (m :: * -> *) outbound inbound a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m outbound
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> NetworkComponent
m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability
Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
MessagePersistence (IOSim s) Any
alicePersistence
Party
alice
[Party
bob, Party
carol]
NetworkCallback
(Authenticated (ReliableMsg (Heartbeat msg))) (IOSim s)
-> (Network (IOSim s) (ReliableMsg (Heartbeat Any))
-> IOSim s [()])
-> IOSim s [()]
baseNetwork
IOSim s [()] -> IOSim s ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IOSim s [()] -> IOSim s ()) -> IOSim s [()] -> IOSim s ()
forall a b. (a -> b) -> a -> b
$ NetworkComponent
(IOSim s) (Authenticated (Heartbeat msg)) (Heartbeat Any) [()]
aliceReliabilityStack (TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
-> NetworkCallback (Authenticated (Heartbeat msg)) (IOSim s)
forall (m :: * -> *) p.
MonadSTM m =>
TVar m (Vector p) -> NetworkCallback p m
captureIncoming TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
TVar s (Vector (Authenticated (Heartbeat msg)))
receivedMessages) ((Network (IOSim s) (Heartbeat Any) -> IOSim s [()])
-> IOSim s [()])
-> (Network (IOSim s) (Heartbeat Any) -> IOSim s [()])
-> IOSim s [()]
forall a b. (a -> b) -> a -> b
$ \Network (IOSim s) (Heartbeat Any)
_action ->
[()] -> IOSim s [()]
forall a. a -> IOSim s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [()]
Vector (Authenticated (Heartbeat msg))
-> [Authenticated (Heartbeat msg)]
forall a. Vector a -> [a]
Vector.toList (Vector (Authenticated (Heartbeat msg))
-> [Authenticated (Heartbeat msg)])
-> IOSim s (Vector (Authenticated (Heartbeat msg)))
-> IOSim s [Authenticated (Heartbeat msg)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
-> IOSim s (Vector (Authenticated (Heartbeat msg)))
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
TVar s (Vector (Authenticated (Heartbeat msg)))
receivedMessages
captureIncoming :: MonadSTM m => TVar m (Vector p) -> NetworkCallback p m
captureIncoming :: forall (m :: * -> *) p.
MonadSTM m =>
TVar m (Vector p) -> NetworkCallback p m
captureIncoming TVar m (Vector p)
receivedMessages =
NetworkCallback
{ $sel:deliver:NetworkCallback :: p -> m ()
deliver = \p
msg ->
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector p) -> (Vector p -> Vector p) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (Vector p)
receivedMessages (Vector p -> p -> Vector p
forall a. Vector a -> a -> Vector a
`snoc` p
msg)
}
capturePayload :: MonadSTM m => TVar m (Vector msg) -> NetworkCallback (Either Connectivity (Authenticated (Heartbeat msg))) m
capturePayload :: forall (m :: * -> *) msg.
MonadSTM m =>
TVar m (Vector msg)
-> NetworkCallback
(Either Connectivity (Authenticated (Heartbeat msg))) m
capturePayload TVar m (Vector msg)
receivedMessages =
NetworkCallback
{ $sel:deliver:NetworkCallback :: Either Connectivity (Authenticated (Heartbeat msg)) -> m ()
deliver = \case
Right Authenticated{$sel:payload:Authenticated :: forall msg. Authenticated msg -> msg
payload = Data NodeId
_ msg
msg} ->
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector msg) -> (Vector msg -> Vector msg) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (Vector msg)
receivedMessages (Vector msg -> msg -> Vector msg
forall a. Vector a -> a -> Vector a
`snoc` msg
msg)
Either Connectivity (Authenticated (Heartbeat msg))
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
}
waitForAllMessages :: MonadSTM m => [msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages :: forall (m :: * -> *) msg.
MonadSTM m =>
[msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages [msg]
expectedMessages TVar m (Vector msg)
capturedMessages = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Vector msg
msgs <- TVar m (Vector msg) -> STM m (Vector msg)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector msg)
capturedMessages
Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ()) -> Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ Vector msg -> Int
forall a. Vector a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Vector msg
msgs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== [msg] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [msg]
expectedMessages
captureTraces ::
MonadSTM m =>
TVar m [ReliabilityLog] ->
Tracer m ReliabilityLog
captureTraces :: forall (m :: * -> *).
MonadSTM m =>
TVar m [ReliabilityLog] -> Tracer m ReliabilityLog
captureTraces TVar m [ReliabilityLog]
tvar = (ReliabilityLog -> m ()) -> Tracer m ReliabilityLog
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer ((ReliabilityLog -> m ()) -> Tracer m ReliabilityLog)
-> (ReliabilityLog -> m ()) -> Tracer m ReliabilityLog
forall a b. (a -> b) -> a -> b
$ \ReliabilityLog
msg -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m [ReliabilityLog]
-> ([ReliabilityLog] -> [ReliabilityLog]) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m [ReliabilityLog]
tvar (ReliabilityLog
msg :)
mockMessagePersistence :: Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence :: forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
numberOfParties = do
TVar m (Vector Int)
acks <- Vector Int -> m (TVar m (Vector Int))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Vector Int -> m (TVar m (Vector Int)))
-> Vector Int -> m (TVar m (Vector Int))
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Int
forall a. Int -> a -> Vector a
replicate Int
numberOfParties Int
0
TVar m (StrictSeq (Heartbeat msg))
messages <- StrictSeq (Heartbeat msg) -> m (TVar m (StrictSeq (Heartbeat msg)))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO StrictSeq (Heartbeat msg)
forall a. Monoid a => a
mempty
MessagePersistence m msg -> m (MessagePersistence m msg)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessagePersistence m msg -> m (MessagePersistence m msg))
-> MessagePersistence m msg -> m (MessagePersistence m msg)
forall a b. (a -> b) -> a -> b
$
MessagePersistence
{ $sel:loadAcks:MessagePersistence :: m (Vector Int)
loadAcks = TVar m (Vector Int) -> m (Vector Int)
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (Vector Int)
acks
, $sel:saveAcks:MessagePersistence :: Vector Int -> m ()
saveAcks = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (Vector Int -> STM m ()) -> Vector Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acks
, $sel:loadMessages:MessagePersistence :: m [Heartbeat msg]
loadMessages = StrictSeq (Heartbeat msg) -> [Heartbeat msg]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (StrictSeq (Heartbeat msg) -> [Heartbeat msg])
-> m (StrictSeq (Heartbeat msg)) -> m [Heartbeat msg]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m (StrictSeq (Heartbeat msg)) -> m (StrictSeq (Heartbeat msg))
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (StrictSeq (Heartbeat msg))
messages
, $sel:appendMessage:MessagePersistence :: Heartbeat msg -> m ()
appendMessage = \Heartbeat msg
msg -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (StrictSeq (Heartbeat msg))
-> (StrictSeq (Heartbeat msg) -> StrictSeq (Heartbeat msg))
-> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (StrictSeq (Heartbeat msg))
messages (StrictSeq (Heartbeat msg)
-> Heartbeat msg -> StrictSeq (Heartbeat msg)
forall a. StrictSeq a -> a -> StrictSeq a
|> Heartbeat msg
msg)
}