module Hydra.Network.ReliabilitySpec where

import Hydra.Prelude hiding (empty, fromList, head, replicate, unlines)
import Test.Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (
  MonadSTM (readTQueue, readTVarIO, writeTQueue),
  check,
  modifyTVar',
  newTQueueIO,
  newTVarIO,
  writeTVar,
 )
import Control.Monad.IOSim (runSimOrThrow)
import Control.Tracer (Tracer (..), nullTracer)
import Data.Sequence.Strict ((|>))
import Data.Vector (Vector, empty, fromList, head, replicate, snoc)
import Data.Vector qualified as Vector
import Hydra.Network (Network (..))
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability)
import Hydra.Node.Network (withFlipHeartbeats)
import Hydra.Persistence (
  Persistence (..),
  PersistenceIncremental (..),
  createPersistence,
  createPersistenceIncremental,
 )
import System.Directory (doesFileExist)
import System.FilePath ((</>))
import System.Random (mkStdGen, uniformR)
import Test.Hydra.Fixture (alice, bob, carol)
import Test.QuickCheck (
  Positive (Positive),
  collect,
  counterexample,
  generate,
  tabulate,
  within,
  (===),
 )
import Prelude (unlines)

spec :: Spec
spec :: Spec
spec = Spec -> Spec
forall a. SpecWith a -> SpecWith a
parallel (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$ do
  let captureOutgoing :: TVar m (Vector msg) -> p -> (Network m msg -> b) -> b
captureOutgoing TVar m (Vector msg)
msgqueue p
_callback Network m msg -> b
action =
        Network m msg -> b
action (Network m msg -> b) -> Network m msg -> b
forall a b. (a -> b) -> a -> b
$ Network{$sel:broadcast:Network :: msg -> m ()
broadcast = \msg
msg -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector msg) -> (Vector msg -> Vector msg) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (Vector msg)
msgqueue (Vector msg -> msg -> Vector msg
forall a. Vector a -> a -> Vector a
`snoc` msg
msg)}

  let msg' :: Int
msg' = Int
42 :: Int
  Heartbeat String
msg <- NodeId -> String -> Heartbeat String
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" (String -> Heartbeat String)
-> SpecM () String -> SpecM () (Heartbeat String)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO String -> SpecM () String
forall r a. IO r -> SpecM a r
runIO (forall a. Gen a -> IO a
generate @String Gen String
forall a. Arbitrary a => Gen a
arbitrary)

  String -> Spec -> Spec
forall a. HasCallStack => String -> SpecWith a -> SpecWith a
describe String
"receiving messages" (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$ do
    String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"forward received messages" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
      let propagatedMessages :: [Authenticated (Heartbeat (Heartbeat String))]
propagatedMessages =
            [Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))]
-> [Authenticated (Heartbeat (Heartbeat String))]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages
              [ReliableMsg (Heartbeat (Heartbeat String))
-> Party
-> Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg)) Party
bob]

      [Authenticated (Heartbeat (Heartbeat String))]
propagatedMessages [Authenticated (Heartbeat (Heartbeat String))]
-> [Authenticated (Heartbeat (Heartbeat String))] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Heartbeat (Heartbeat String)
-> Party -> Authenticated (Heartbeat (Heartbeat String))
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg) Party
bob]

    String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"do not drop messages with same ids from different peers" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
      let propagatedMessages :: [Authenticated (Heartbeat Int)]
propagatedMessages =
            [Authenticated (ReliableMsg (Heartbeat Int))]
-> [Authenticated (Heartbeat Int)]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages
              [ ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
1, Int
0]) (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
msg')) Party
bob
              , ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
0, Int
1]) (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg')) Party
carol
              ]

      [Authenticated (Heartbeat Int)]
propagatedMessages [Authenticated (Heartbeat Int)]
-> [Authenticated (Heartbeat Int)] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Heartbeat Int -> Party -> Authenticated (Heartbeat Int)
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
msg') Party
bob, Heartbeat Int -> Party -> Authenticated (Heartbeat Int)
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg') Party
carol]

    String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"Ignores messages with malformed acks" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
      let malFormedAck :: Vector Int
malFormedAck = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
0]
          wellFormedAck :: Vector Int
wellFormedAck = [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
0, Int
1]
          propagatedMessages :: [Authenticated (Heartbeat Int)]
propagatedMessages =
            [Authenticated (ReliableMsg (Heartbeat Int))]
-> [Authenticated (Heartbeat Int)]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages
              [ ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
malFormedAck (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
msg')) Party
bob
              , ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg Vector Int
wellFormedAck (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg')) Party
carol
              ]

      [Authenticated (Heartbeat Int)]
propagatedMessages [Authenticated (Heartbeat Int)]
-> [Authenticated (Heartbeat Int)] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Heartbeat Int -> Party -> Authenticated (Heartbeat Int)
forall msg. msg -> Party -> Authenticated msg
Authenticated (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-3" Int
msg') Party
carol]

    String -> ([Positive Int] -> Property) -> Spec
forall prop.
(HasCallStack, Testable prop) =>
String -> prop -> Spec
prop String
"drops already received messages" (([Positive Int] -> Property) -> Spec)
-> ([Positive Int] -> Property) -> Spec
forall a b. (a -> b) -> a -> b
$ \([Positive Int]
messages :: [Positive Int]) ->
      let
        messagesToSend :: [Authenticated (ReliableMsg (Heartbeat Int))]
messagesToSend =
          (\(Positive Int
m) -> ReliableMsg (Heartbeat Int)
-> Party -> Authenticated (ReliableMsg (Heartbeat Int))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int -> Heartbeat Int -> ReliableMsg (Heartbeat Int)
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
m, Int
0]) (NodeId -> Int -> Heartbeat Int
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Int
m)) Party
bob)
            (Positive Int -> Authenticated (ReliableMsg (Heartbeat Int)))
-> [Positive Int] -> [Authenticated (ReliableMsg (Heartbeat Int))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Positive Int]
messages
        propagatedMessages :: [Authenticated (Heartbeat Int)]
propagatedMessages = [Authenticated (ReliableMsg (Heartbeat Int))]
-> [Authenticated (Heartbeat Int)]
forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages [Authenticated (ReliableMsg (Heartbeat Int))]
messagesToSend

        receivedMessagesInOrder :: t (Authenticated (Heartbeat a)) -> Bool
receivedMessagesInOrder t (Authenticated (Heartbeat a))
messageReceived =
          let refMessages :: [Heartbeat a]
refMessages = NodeId -> a -> Heartbeat a
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" (a -> Heartbeat a) -> [a] -> [Heartbeat a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [a
1 ..]
              isInMessage :: Authenticated (Heartbeat a) -> Bool
isInMessage Authenticated{Heartbeat a
payload :: Heartbeat a
$sel:payload:Authenticated :: forall msg. Authenticated msg -> msg
payload} = Heartbeat a
payload Heartbeat a -> [Heartbeat a] -> Bool
forall (f :: * -> *) a.
(Foldable f, DisallowElem f, Eq a) =>
a -> f a -> Bool
`elem` [Heartbeat a]
refMessages
           in (Authenticated (Heartbeat a) -> Bool)
-> t (Authenticated (Heartbeat a)) -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all Authenticated (Heartbeat a) -> Bool
isInMessage t (Authenticated (Heartbeat a))
messageReceived
       in
        [Authenticated (Heartbeat Int)] -> Bool
forall {a} {t :: * -> *}.
(Eq a, Foldable t, Num a, Enum a) =>
t (Authenticated (Heartbeat a)) -> Bool
receivedMessagesInOrder [Authenticated (Heartbeat Int)]
propagatedMessages
          Bool -> (Bool -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample ([Authenticated (Heartbeat Int)] -> String
forall b a. (Show a, IsString b) => a -> b
show [Authenticated (Heartbeat Int)]
propagatedMessages)
          Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& Int -> Property -> Property
forall a prop. (Show a, Testable prop) => a -> prop -> Property
collect ([Authenticated (Heartbeat Int)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Authenticated (Heartbeat Int)]
propagatedMessages)

  String -> Spec -> Spec
forall a. HasCallStack => String -> SpecWith a -> SpecWith a
describe String
"sending messages" (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$ do
    String -> ([String] -> Expectation) -> Spec
forall prop.
(HasCallStack, Testable prop) =>
String -> prop -> Spec
prop String
"broadcast messages to the network assigning a sequential id" (([String] -> Expectation) -> Spec)
-> ([String] -> Expectation) -> Spec
forall a b. (a -> b) -> a -> b
$ \([String]
messages :: [String]) ->
      let sentMsgs :: Vector (ReliableMsg (Heartbeat String))
sentMsgs = (forall s. IOSim s (Vector (ReliableMsg (Heartbeat String))))
-> Vector (ReliableMsg (Heartbeat String))
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s (Vector (ReliableMsg (Heartbeat String))))
 -> Vector (ReliableMsg (Heartbeat String)))
-> (forall s. IOSim s (Vector (ReliableMsg (Heartbeat String))))
-> Vector (ReliableMsg (Heartbeat String))
forall a b. (a -> b) -> a -> b
$ do
            TVar s (Vector (ReliableMsg (Heartbeat String)))
sentMessages <- Vector (ReliableMsg (Heartbeat String))
-> IOSim
     s (TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String))))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (ReliableMsg (Heartbeat String))
forall a. Vector a
empty
            MessagePersistence (IOSim s) String
persistence <- Int
-> MonadSTM (IOSim s) =>
   IOSim s (MessagePersistence (IOSim s) String)
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
1

            Tracer (IOSim s) ReliabilityLog
-> MessagePersistence (IOSim s) String
-> Party
-> [Party]
-> NetworkComponent
     (IOSim s)
     (Authenticated (ReliableMsg (Heartbeat String)))
     (ReliableMsg (Heartbeat String))
     ()
-> NetworkComponent
     (IOSim s) (Authenticated (Heartbeat String)) (Heartbeat String) ()
forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer MessagePersistence (IOSim s) String
persistence Party
alice [] (TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
-> NetworkComponent
     (IOSim s)
     (Authenticated (ReliableMsg (Heartbeat String)))
     (ReliableMsg (Heartbeat String))
     ()
forall {m :: * -> *} {msg} {p} {b}.
MonadSTM m =>
TVar m (Vector msg) -> p -> (Network m msg -> b) -> b
captureOutgoing TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
TVar s (Vector (ReliableMsg (Heartbeat String)))
sentMessages) Authenticated (Heartbeat String) -> IOSim s ()
forall (m :: * -> *) b. Monad m => b -> m ()
noop ((Network (IOSim s) (Heartbeat String) -> IOSim s ())
 -> IOSim s ())
-> (Network (IOSim s) (Heartbeat String) -> IOSim s ())
-> IOSim s ()
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat String -> IOSim s ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat String -> IOSim s ()
broadcast} -> do
              (String -> IOSim s ()) -> [String] -> IOSim s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Heartbeat String -> IOSim s ()
broadcast (Heartbeat String -> IOSim s ())
-> (String -> Heartbeat String) -> String -> IOSim s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> String -> Heartbeat String
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1") [String]
messages

            [ReliableMsg (Heartbeat String)]
-> Vector (ReliableMsg (Heartbeat String))
forall a. [a] -> Vector a
fromList ([ReliableMsg (Heartbeat String)]
 -> Vector (ReliableMsg (Heartbeat String)))
-> (Vector (ReliableMsg (Heartbeat String))
    -> [ReliableMsg (Heartbeat String)])
-> Vector (ReliableMsg (Heartbeat String))
-> Vector (ReliableMsg (Heartbeat String))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector (ReliableMsg (Heartbeat String))
-> [ReliableMsg (Heartbeat String)]
forall a. Vector a -> [a]
Vector.toList (Vector (ReliableMsg (Heartbeat String))
 -> Vector (ReliableMsg (Heartbeat String)))
-> IOSim s (Vector (ReliableMsg (Heartbeat String)))
-> IOSim s (Vector (ReliableMsg (Heartbeat String)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
-> IOSim s (Vector (ReliableMsg (Heartbeat String)))
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector (ReliableMsg (Heartbeat String)))
TVar s (Vector (ReliableMsg (Heartbeat String)))
sentMessages
       in Vector Int -> Int
forall a. Vector a -> a
head (Vector Int -> Int)
-> (ReliableMsg (Heartbeat String) -> Vector Int)
-> ReliableMsg (Heartbeat String)
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReliableMsg (Heartbeat String) -> Vector Int
forall msg. ReliableMsg msg -> Vector Int
knownMessageIds (ReliableMsg (Heartbeat String) -> Int)
-> Vector (ReliableMsg (Heartbeat String)) -> Vector Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (ReliableMsg (Heartbeat String))
sentMsgs Vector Int -> Vector Int -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1 .. ([String] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [String]
messages)]

    -- this test is quite critical as it demonstrates messages dropped are properly managed and resent to the
    -- other party whatever the length of queue, and whatever the interleaving of threads
    (Int -> Int) -> Spec -> Spec
forall a. (Int -> Int) -> SpecWith a -> SpecWith a
modifyMaxSuccess (Int -> Int -> Int
forall a b. a -> b -> a
const Int
5000) (Spec -> Spec) -> Spec -> Spec
forall a b. (a -> b) -> a -> b
$
      String -> ([Int] -> [Int] -> Int -> Property) -> Spec
forall prop.
(HasCallStack, Testable prop) =>
String -> prop -> Spec
prop String
"stress test networking layer" (([Int] -> [Int] -> Int -> Property) -> Spec)
-> ([Int] -> [Int] -> Int -> Property) -> Spec
forall a b. (a -> b) -> a -> b
$ \([Int]
aliceToBobMessages :: [Int]) ([Int]
bobToAliceMessages :: [Int]) Int
seed ->
        let
          ([Int]
msgReceivedByAlice, [Int]
msgReceivedByBob, [ReliabilityLog]
traces) = (forall s. IOSim s ([Int], [Int], [ReliabilityLog]))
-> ([Int], [Int], [ReliabilityLog])
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s ([Int], [Int], [ReliabilityLog]))
 -> ([Int], [Int], [ReliabilityLog]))
-> (forall s. IOSim s ([Int], [Int], [ReliabilityLog]))
-> ([Int], [Int], [ReliabilityLog])
forall a b. (a -> b) -> a -> b
$ do
            TVar s (Vector Int)
messagesReceivedByBob <- Vector Int -> IOSim s (TVar (IOSim s) (Vector Int))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector Int
forall a. Vector a
empty
            TVar s (Vector Int)
messagesReceivedByAlice <- Vector Int -> IOSim s (TVar (IOSim s) (Vector Int))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector Int
forall a. Vector a
empty
            TVar s [ReliabilityLog]
emittedTraces <- [ReliabilityLog] -> IOSim s (TVar (IOSim s) [ReliabilityLog])
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO []
            TVar s StdGen
randomSeed <- StdGen -> IOSim s (TVar (IOSim s) StdGen)
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (StdGen -> IOSim s (TVar (IOSim s) StdGen))
-> StdGen -> IOSim s (TVar (IOSim s) StdGen)
forall a b. (a -> b) -> a -> b
$ Int -> StdGen
mkStdGen Int
seed
            TQueueDefault
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
aliceToBob <- IOSim
  s
  (TQueue
     (IOSim s)
     (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
IOSim
  s
  (TQueueDefault
     (IOSim s)
     (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
forall a. IOSim s (TQueue (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => m (TQueue m a)
newTQueueIO
            TQueueDefault
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
bobToAlice <- IOSim
  s
  (TQueue
     (IOSim s)
     (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
IOSim
  s
  (TQueueDefault
     (IOSim s)
     (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
forall a. IOSim s (TQueue (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => m (TQueue m a)
newTQueueIO
            MessagePersistence (IOSim s) (Heartbeat Int)
alicePersistence <- Int
-> MonadSTM (IOSim s) =>
   IOSim s (MessagePersistence (IOSim s) (Heartbeat Int))
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
2
            MessagePersistence (IOSim s) (Heartbeat Int)
bobPersistence <- Int
-> MonadSTM (IOSim s) =>
   IOSim s (MessagePersistence (IOSim s) (Heartbeat Int))
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
2
            let
              -- this is a NetworkComponent that broadcasts authenticated messages
              -- mediated through a read and a write TQueue but drops 0.2 % of them
              aliceFailingNetwork :: (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
 -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> IOSim s ()
aliceFailingNetwork = TVar (IOSim s) StdGen
-> Party
-> (TQueue
      (IOSim s)
      (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))),
    TQueue
      (IOSim s)
      (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
-> (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {a} {t} {msg} {a} {b}.
(MonadAsync m, MonadSTM m, RandomGen a) =>
TVar m a
-> Party
-> (TQueue m t, TQueue m (Authenticated msg))
-> (t -> m a)
-> (Network m msg -> m b)
-> m b
failingNetwork TVar (IOSim s) StdGen
TVar s StdGen
randomSeed Party
alice (TQueue
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
bobToAlice, TQueue
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
aliceToBob)
              bobFailingNetwork :: (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
 -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> IOSim s ()
bobFailingNetwork = TVar (IOSim s) StdGen
-> Party
-> (TQueue
      (IOSim s)
      (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))),
    TQueue
      (IOSim s)
      (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))))
-> (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {a} {t} {msg} {a} {b}.
(MonadAsync m, MonadSTM m, RandomGen a) =>
TVar m a
-> Party
-> (TQueue m t, TQueue m (Authenticated msg))
-> (t -> m a)
-> (Network m msg -> m b)
-> m b
failingNetwork TVar (IOSim s) StdGen
TVar s StdGen
randomSeed Party
bob (TQueue
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
aliceToBob, TQueue
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
TQueueDefault
  (IOSim s) (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int))))
bobToAlice)

              bobReliabilityStack :: NetworkComponent
  (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
bobReliabilityStack = MessagePersistence (IOSim s) (Heartbeat Int)
-> ((Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
     -> IOSim s ())
    -> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
        -> IOSim s ())
    -> IOSim s ())
-> Tracer (IOSim s) ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
     (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
forall {m :: * -> *} {msg} {a}.
(MonadAsync m, MonadDelay m, MonadThrow m, MonadThrow (STM m)) =>
MessagePersistence m msg
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent m (Authenticated msg) msg a
reliabilityStack MessagePersistence (IOSim s) (Heartbeat Int)
bobPersistence (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
 -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> IOSim s ()
bobFailingNetwork (TVar (IOSim s) [ReliabilityLog] -> Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *).
MonadSTM m =>
TVar m [ReliabilityLog] -> Tracer m ReliabilityLog
captureTraces TVar (IOSim s) [ReliabilityLog]
TVar s [ReliabilityLog]
emittedTraces) NodeId
"bob" Party
bob [Party
alice]
              aliceReliabilityStack :: NetworkComponent
  (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
aliceReliabilityStack = MessagePersistence (IOSim s) (Heartbeat Int)
-> ((Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
     -> IOSim s ())
    -> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
        -> IOSim s ())
    -> IOSim s ())
-> Tracer (IOSim s) ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
     (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
forall {m :: * -> *} {msg} {a}.
(MonadAsync m, MonadDelay m, MonadThrow m, MonadThrow (STM m)) =>
MessagePersistence m msg
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent m (Authenticated msg) msg a
reliabilityStack MessagePersistence (IOSim s) (Heartbeat Int)
alicePersistence (Authenticated (ReliableMsg (Heartbeat (Heartbeat Int)))
 -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat Int)))
    -> IOSim s ())
-> IOSim s ()
aliceFailingNetwork (TVar (IOSim s) [ReliabilityLog] -> Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *).
MonadSTM m =>
TVar m [ReliabilityLog] -> Tracer m ReliabilityLog
captureTraces TVar (IOSim s) [ReliabilityLog]
TVar s [ReliabilityLog]
emittedTraces) NodeId
"alice" Party
alice [Party
bob]

              runAlice :: IOSim s ()
runAlice = NetworkComponent
  (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
-> NodeId
-> TVar (IOSim s) (Vector Int)
-> TVar (IOSim s) (Vector Int)
-> [Int]
-> [Int]
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {msg} {msg} {b}.
(TVar m ~ TVar m, MonadDelay m, MonadAsync m, MonadSTM m) =>
((Authenticated (Heartbeat msg) -> m ())
 -> (Network m (Heartbeat msg) -> m ()) -> b)
-> NodeId
-> TVar m (Vector msg)
-> TVar m (Vector msg)
-> [msg]
-> [msg]
-> b
runPeer NetworkComponent
  (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
aliceReliabilityStack NodeId
"alice" TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByAlice TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByBob [Int]
aliceToBobMessages [Int]
bobToAliceMessages
              runBob :: IOSim s ()
runBob = NetworkComponent
  (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
-> NodeId
-> TVar (IOSim s) (Vector Int)
-> TVar (IOSim s) (Vector Int)
-> [Int]
-> [Int]
-> IOSim s ()
forall {m :: * -> *} {m :: * -> *} {msg} {msg} {b}.
(TVar m ~ TVar m, MonadDelay m, MonadAsync m, MonadSTM m) =>
((Authenticated (Heartbeat msg) -> m ())
 -> (Network m (Heartbeat msg) -> m ()) -> b)
-> NodeId
-> TVar m (Vector msg)
-> TVar m (Vector msg)
-> [msg]
-> [msg]
-> b
runPeer NetworkComponent
  (IOSim s) (Authenticated (Heartbeat Int)) (Heartbeat Int) ()
bobReliabilityStack NodeId
"bob" TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByBob TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByAlice [Int]
bobToAliceMessages [Int]
aliceToBobMessages

            IOSim s () -> IOSim s () -> IOSim s ()
forall a b. IOSim s a -> IOSim s b -> IOSim s ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_ IOSim s ()
runAlice IOSim s ()
runBob

            [ReliabilityLog]
logs <- TVar (IOSim s) [ReliabilityLog] -> IOSim s [ReliabilityLog]
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) [ReliabilityLog]
TVar s [ReliabilityLog]
emittedTraces
            [Int]
aliceReceived <- Vector Int -> [Int]
forall a. Vector a -> [a]
Vector.toList (Vector Int -> [Int]) -> IOSim s (Vector Int) -> IOSim s [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector Int) -> IOSim s (Vector Int)
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByAlice
            [Int]
bobReceived <- Vector Int -> [Int]
forall a. Vector a -> [a]
Vector.toList (Vector Int -> [Int]) -> IOSim s (Vector Int) -> IOSim s [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector Int) -> IOSim s (Vector Int)
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector Int)
TVar s (Vector Int)
messagesReceivedByBob
            ([Int], [Int], [ReliabilityLog])
-> IOSim s ([Int], [Int], [ReliabilityLog])
forall a. a -> IOSim s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Int]
aliceReceived, [Int]
bobReceived, [ReliabilityLog]
logs)
         in
          Int -> Property -> Property
forall prop. Testable prop => Int -> prop -> Property
within Int
1000000 (Property -> Property) -> Property -> Property
forall a b. (a -> b) -> a -> b
$
            [Int]
msgReceivedByBob
              [Int] -> [Int] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== [Int]
aliceToBobMessages
              Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> Property -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample ([String] -> String
unlines ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ ReliabilityLog -> String
forall b a. (Show a, IsString b) => a -> b
show (ReliabilityLog -> String) -> [ReliabilityLog] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ReliabilityLog] -> [ReliabilityLog]
forall a. [a] -> [a]
reverse [ReliabilityLog]
traces)
              Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> [String] -> Property -> Property
forall prop.
Testable prop =>
String -> [String] -> prop -> Property
tabulate String
"Messages from Alice to Bob" [String
"< " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall b a. (Show a, IsString b) => a -> b
show (([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
msgReceivedByBob Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10)]
              Property -> (Property -> Property) -> Property
forall a b. a -> (a -> b) -> b
& String -> [String] -> Property -> Property
forall prop.
Testable prop =>
String -> [String] -> prop -> Property
tabulate String
"Messages from Bob to Alice" [String
"< " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall b a. (Show a, IsString b) => a -> b
show (([Int] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int]
msgReceivedByAlice Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10)]

    String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"broadcast updates counter from peers" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
      let receivedMsgs :: [ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs = (forall s. IOSim s [ReliableMsg (Heartbeat (Heartbeat String))])
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s [ReliableMsg (Heartbeat (Heartbeat String))])
 -> [ReliableMsg (Heartbeat (Heartbeat String))])
-> (forall s. IOSim s [ReliableMsg (Heartbeat (Heartbeat String))])
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a b. (a -> b) -> a -> b
$ do
            TVar s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages <- Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim
     s
     (TVar
        (IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String)))))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a
empty
            MessagePersistence (IOSim s) (Heartbeat String)
alicePersistence <- Int
-> MonadSTM (IOSim s) =>
   IOSim s (MessagePersistence (IOSim s) (Heartbeat String))
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
2
            Tracer (IOSim s) ReliabilityLog
-> MessagePersistence (IOSim s) (Heartbeat String)
-> Party
-> [Party]
-> NetworkComponent
     (IOSim s)
     (Authenticated (ReliableMsg (Heartbeat (Heartbeat String))))
     (ReliableMsg (Heartbeat (Heartbeat String)))
     ()
-> NetworkComponent
     (IOSim s)
     (Authenticated (Heartbeat (Heartbeat String)))
     (Heartbeat (Heartbeat String))
     ()
forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability
              Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
              MessagePersistence (IOSim s) (Heartbeat String)
alicePersistence
              Party
alice
              [Party
bob]
              ( \NetworkCallback
  (Authenticated (ReliableMsg (Heartbeat (Heartbeat String))))
  (IOSim s)
incoming Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
action -> do
                  IOSim s () -> IOSim s () -> IOSim s ()
forall a b. IOSim s a -> IOSim s b -> IOSim s ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_
                    (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
action (Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
 -> IOSim s ())
-> Network (IOSim s) (ReliableMsg (Heartbeat (Heartbeat String)))
-> IOSim s ()
forall a b. (a -> b) -> a -> b
$ Network{$sel:broadcast:Network :: ReliableMsg (Heartbeat (Heartbeat String)) -> IOSim s ()
broadcast = \ReliableMsg (Heartbeat (Heartbeat String))
m -> STM (IOSim s) () -> IOSim s ()
forall a. HasCallStack => STM (IOSim s) a -> IOSim s a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM (IOSim s) () -> IOSim s ()) -> STM (IOSim s) () -> IOSim s ()
forall a b. (a -> b) -> a -> b
$ TVar
  (IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
    -> Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> STM (IOSim s) ()
forall a. TVar (IOSim s) a -> (a -> a) -> STM (IOSim s) ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar
  (IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> ReliableMsg (Heartbeat (Heartbeat String))
-> Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a -> a -> Vector a
`snoc` ReliableMsg (Heartbeat (Heartbeat String))
m)})
                    (NetworkCallback
  (Authenticated (ReliableMsg (Heartbeat (Heartbeat String))))
  (IOSim s)
incoming (ReliableMsg (Heartbeat (Heartbeat String))
-> Party
-> Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg)) Party
bob))
              )
              Authenticated (Heartbeat (Heartbeat String)) -> IOSim s ()
forall (m :: * -> *) b. Monad m => b -> m ()
noop
              ((Network (IOSim s) (Heartbeat (Heartbeat String)) -> IOSim s ())
 -> IOSim s ())
-> (Network (IOSim s) (Heartbeat (Heartbeat String)) -> IOSim s ())
-> IOSim s ()
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat (Heartbeat String) -> IOSim s ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat (Heartbeat String) -> IOSim s ()
broadcast} -> do
                DiffTime -> IOSim s ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
                Heartbeat (Heartbeat String) -> IOSim s ()
broadcast (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)
            Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a. Vector a -> [a]
Vector.toList (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
 -> [ReliableMsg (Heartbeat (Heartbeat String))])
-> IOSim s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IOSim s [ReliableMsg (Heartbeat (Heartbeat String))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar
  (IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IOSim s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar
  (IOSim s) (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar s (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages

      [ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs [ReliableMsg (Heartbeat (Heartbeat String))]
-> [ReliableMsg (Heartbeat (Heartbeat String))] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)]

    String -> Expectation -> SpecWith (Arg Expectation)
forall a.
(HasCallStack, Example a) =>
String -> a -> SpecWith (Arg a)
it String
"appends messages to disk and can load them back" (Expectation -> SpecWith (Arg Expectation))
-> Expectation -> SpecWith (Arg Expectation)
forall a b. (a -> b) -> a -> b
$ do
      String -> (String -> Expectation) -> Expectation
forall (m :: * -> *) r.
MonadIO m =>
String -> (String -> m r) -> m r
withTempDir String
"network-messages-persistence" ((String -> Expectation) -> Expectation)
-> (String -> Expectation) -> Expectation
forall a b. (a -> b) -> a -> b
$ \String
tmpDir -> do
        let networkMessagesFile :: String
networkMessagesFile = String
tmpDir String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"/network-messages"

        Persistence{FromJSON (Vector Int) => IO (Maybe (Vector Int))
load :: FromJSON (Vector Int) => IO (Maybe (Vector Int))
$sel:load:Persistence :: forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load, ToJSON (Vector Int) => Vector Int -> Expectation
save :: ToJSON (Vector Int) => Vector Int -> Expectation
$sel:save:Persistence :: forall a (m :: * -> *). Persistence a m -> ToJSON a => a -> m ()
save} <- String -> IO (Persistence (Vector Int) IO)
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
String -> m (Persistence a m)
createPersistence (String -> IO (Persistence (Vector Int) IO))
-> String -> IO (Persistence (Vector Int) IO)
forall a b. (a -> b) -> a -> b
$ String
tmpDir String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"/acks"
        PersistenceIncremental{FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll :: FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll, ToJSON (Heartbeat (Heartbeat String)) =>
Heartbeat (Heartbeat String) -> Expectation
append :: ToJSON (Heartbeat (Heartbeat String)) =>
Heartbeat (Heartbeat String) -> Expectation
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append} <- String
-> IO (PersistenceIncremental (Heartbeat (Heartbeat String)) IO)
forall a (m :: * -> *).
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m,
 MonadThrow (STM m)) =>
String -> m (PersistenceIncremental a m)
createPersistenceIncremental String
networkMessagesFile

        let messagePersistence :: MessagePersistence IO (Heartbeat String)
messagePersistence =
              MessagePersistence
                { $sel:loadAcks:MessagePersistence :: IO (Vector Int)
loadAcks = do
                    Maybe (Vector Int)
mloaded <- IO (Maybe (Vector Int))
FromJSON (Vector Int) => IO (Maybe (Vector Int))
load
                    case Maybe (Vector Int)
mloaded of
                      Maybe (Vector Int)
Nothing -> Vector Int -> IO (Vector Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Vector Int -> IO (Vector Int)) -> Vector Int -> IO (Vector Int)
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Int
forall a. Int -> a -> Vector a
replicate ([Party] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Party
alice, Party
bob]) Int
0
                      Just Vector Int
acks -> Vector Int -> IO (Vector Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector Int
acks
                , $sel:saveAcks:MessagePersistence :: Vector Int -> Expectation
saveAcks = ToJSON (Vector Int) => Vector Int -> Expectation
Vector Int -> Expectation
save
                , $sel:loadMessages:MessagePersistence :: IO [Heartbeat (Heartbeat String)]
loadMessages = IO [Heartbeat (Heartbeat String)]
FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll
                , $sel:appendMessage:MessagePersistence :: Heartbeat (Heartbeat String) -> Expectation
appendMessage = ToJSON (Heartbeat (Heartbeat String)) =>
Heartbeat (Heartbeat String) -> Expectation
Heartbeat (Heartbeat String) -> Expectation
append
                }

        [ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs <- do
          TVar (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages <- Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> IO
     (TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String)))))
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a
empty
          Tracer IO ReliabilityLog
-> MessagePersistence IO (Heartbeat String)
-> Party
-> [Party]
-> NetworkComponent
     IO
     (Authenticated (ReliableMsg (Heartbeat (Heartbeat String))))
     (ReliableMsg (Heartbeat (Heartbeat String)))
     ()
-> NetworkComponent
     IO
     (Authenticated (Heartbeat (Heartbeat String)))
     (Heartbeat (Heartbeat String))
     ()
forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability
            Tracer IO ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
            MessagePersistence IO (Heartbeat String)
messagePersistence
            Party
alice
            [Party
bob]
            ( \NetworkCallback
  (Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))) IO
incoming Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
action -> do
                Expectation -> Expectation -> Expectation
forall a b. IO a -> IO b -> Expectation
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_
                  (Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
action (Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
 -> Expectation)
-> Network IO (ReliableMsg (Heartbeat (Heartbeat String)))
-> Expectation
forall a b. (a -> b) -> a -> b
$ Network{$sel:broadcast:Network :: ReliableMsg (Heartbeat (Heartbeat String)) -> Expectation
broadcast = \ReliableMsg (Heartbeat (Heartbeat String))
m -> STM IO () -> Expectation
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> Expectation) -> STM IO () -> Expectation
forall a b. (a -> b) -> a -> b
$ TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
    -> Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> STM IO ()
forall a. TVar IO a -> (a -> a) -> STM IO ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> ReliableMsg (Heartbeat (Heartbeat String))
-> Vector (ReliableMsg (Heartbeat (Heartbeat String)))
forall a. Vector a -> a -> Vector a
`snoc` ReliableMsg (Heartbeat (Heartbeat String))
m)})
                  (NetworkCallback
  (Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))) IO
incoming (ReliableMsg (Heartbeat (Heartbeat String))
-> Party
-> Authenticated (ReliableMsg (Heartbeat (Heartbeat String)))
forall msg. msg -> Party -> Authenticated msg
Authenticated (Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
0, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-2" Heartbeat String
msg)) Party
bob))
            )
            Authenticated (Heartbeat (Heartbeat String)) -> Expectation
forall (m :: * -> *) b. Monad m => b -> m ()
noop
            ((Network IO (Heartbeat (Heartbeat String)) -> Expectation)
 -> Expectation)
-> (Network IO (Heartbeat (Heartbeat String)) -> Expectation)
-> Expectation
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat (Heartbeat String) -> Expectation
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat (Heartbeat String) -> Expectation
broadcast} -> do
              DiffTime -> Expectation
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
              Heartbeat (Heartbeat String) -> Expectation
broadcast (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)
          Vector (ReliableMsg (Heartbeat (Heartbeat String)))
-> [ReliableMsg (Heartbeat (Heartbeat String))]
forall a. Vector a -> [a]
Vector.toList (Vector (ReliableMsg (Heartbeat (Heartbeat String)))
 -> [ReliableMsg (Heartbeat (Heartbeat String))])
-> IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IO [ReliableMsg (Heartbeat (Heartbeat String))]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
-> IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
forall a. TVar IO a -> IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
TVar IO (Vector (ReliableMsg (Heartbeat (Heartbeat String))))
sentMessages

        [ReliableMsg (Heartbeat (Heartbeat String))]
receivedMsgs [ReliableMsg (Heartbeat (Heartbeat String))]
-> [ReliableMsg (Heartbeat (Heartbeat String))] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => a -> a -> Expectation
`shouldBe` [Vector Int
-> Heartbeat (Heartbeat String)
-> ReliableMsg (Heartbeat (Heartbeat String))
forall msg. Vector Int -> msg -> ReliableMsg msg
ReliableMsg ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1]) (NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg)]

        String -> IO Bool
doesFileExist String
networkMessagesFile IO Bool -> Bool -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` Bool
True
        String -> IO [Heartbeat (Heartbeat String)]
reloadAll String
networkMessagesFile IO [Heartbeat (Heartbeat String)]
-> [Heartbeat (Heartbeat String)] -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` [NodeId -> Heartbeat String -> Heartbeat (Heartbeat String)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
"node-1" Heartbeat String
msg]

        String -> IO Bool
doesFileExist (String
tmpDir String -> String -> String
</> String
"acks") IO Bool -> Bool -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` Bool
True
        IO (Maybe (Vector Int))
FromJSON (Vector Int) => IO (Maybe (Vector Int))
load IO (Maybe (Vector Int)) -> Maybe (Vector Int) -> Expectation
forall a. (HasCallStack, Show a, Eq a) => IO a -> a -> Expectation
`shouldReturn` Vector Int -> Maybe (Vector Int)
forall a. a -> Maybe a
Just ([Int] -> Vector Int
forall a. [a] -> Vector a
fromList [Int
1, Int
1])
 where
  runPeer :: ((Authenticated (Heartbeat msg) -> m ())
 -> (Network m (Heartbeat msg) -> m ()) -> b)
-> NodeId
-> TVar m (Vector msg)
-> TVar m (Vector msg)
-> [msg]
-> [msg]
-> b
runPeer (Authenticated (Heartbeat msg) -> m ())
-> (Network m (Heartbeat msg) -> m ()) -> b
reliability NodeId
partyName TVar m (Vector msg)
receivedMessageContainer TVar m (Vector msg)
sentMessageContainer [msg]
messagesToSend [msg]
expectedMessages =
    (Authenticated (Heartbeat msg) -> m ())
-> (Network m (Heartbeat msg) -> m ()) -> b
reliability (TVar m (Vector msg) -> Authenticated (Heartbeat msg) -> m ()
forall (m :: * -> *) msg.
MonadSTM m =>
TVar m (Vector msg) -> Authenticated (Heartbeat msg) -> m ()
capturePayload TVar m (Vector msg)
receivedMessageContainer) ((Network m (Heartbeat msg) -> m ()) -> b)
-> (Network m (Heartbeat msg) -> m ()) -> b
forall a b. (a -> b) -> a -> b
$ \Network{Heartbeat msg -> m ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat msg -> m ()
broadcast} -> do
      [msg] -> (msg -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [msg]
messagesToSend ((msg -> m ()) -> m ()) -> (msg -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \msg
m -> do
        Heartbeat msg -> m ()
broadcast (NodeId -> msg -> Heartbeat msg
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
partyName msg
m)
        DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
0.1

      m () -> m () -> m ()
forall a b. m a -> m b -> m ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
concurrently_
        ([msg] -> TVar m (Vector msg) -> m ()
forall (m :: * -> *) msg.
MonadSTM m =>
[msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages [msg]
expectedMessages TVar m (Vector msg)
TVar m (Vector msg)
receivedMessageContainer)
        ([msg] -> TVar m (Vector msg) -> m ()
forall (m :: * -> *) msg.
MonadSTM m =>
[msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages [msg]
messagesToSend TVar m (Vector msg)
sentMessageContainer)

  reliabilityStack :: MessagePersistence m msg
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent m (Authenticated msg) msg a
reliabilityStack MessagePersistence m msg
persistence NetworkComponent
  m
  (Authenticated (ReliableMsg (Heartbeat msg)))
  (ReliableMsg (Heartbeat msg))
  a
underlyingNetwork Tracer m ReliabilityLog
tracer NodeId
nodeId Party
party [Party]
peers =
    NodeId
-> ConnectionMessages m
-> NetworkComponent
     m (Heartbeat (Authenticated msg)) (Heartbeat msg) a
-> NetworkComponent m (Authenticated msg) msg a
forall (m :: * -> *) msg1 msg a.
(MonadAsync m, MonadDelay m) =>
NodeId
-> ConnectionMessages m
-> NetworkComponent m (Heartbeat msg1) (Heartbeat msg) a
-> NetworkComponent m msg1 msg a
withHeartbeat NodeId
nodeId ConnectionMessages m
forall (m :: * -> *) b. Monad m => b -> m ()
noop (NetworkComponent
   m (Heartbeat (Authenticated msg)) (Heartbeat msg) a
 -> NetworkComponent m (Authenticated msg) msg a)
-> NetworkComponent
     m (Heartbeat (Authenticated msg)) (Heartbeat msg) a
-> NetworkComponent m (Authenticated msg) msg a
forall a b. (a -> b) -> a -> b
$
      NetworkComponent
  m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
-> NetworkComponent
     m (Heartbeat (Authenticated msg)) (Heartbeat msg) a
forall (m :: * -> *) msg msg1 a.
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
-> NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
withFlipHeartbeats (NetworkComponent
   m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
 -> NetworkComponent
      m (Heartbeat (Authenticated msg)) (Heartbeat msg) a)
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
-> NetworkComponent
     m (Heartbeat (Authenticated msg)) (Heartbeat msg) a
forall a b. (a -> b) -> a -> b
$
        Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability Tracer m ReliabilityLog
tracer MessagePersistence m msg
persistence Party
party [Party]
peers NetworkComponent
  m
  (Authenticated (ReliableMsg (Heartbeat msg)))
  (ReliableMsg (Heartbeat msg))
  a
underlyingNetwork

  failingNetwork :: TVar m a
-> Party
-> (TQueue m t, TQueue m (Authenticated msg))
-> (t -> m a)
-> (Network m msg -> m b)
-> m b
failingNetwork TVar m a
seed Party
peer (TQueue m t
readQueue, TQueue m (Authenticated msg)
writeQueue) t -> m a
callback Network m msg -> m b
action =
    m Any -> (Async m Any -> m b) -> m b
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync
      ( m a -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m a -> m Any) -> m a -> m Any
forall a b. (a -> b) -> a -> b
$ do
          t
newMsg <- STM m t -> m t
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m t -> m t) -> STM m t -> m t
forall a b. (a -> b) -> a -> b
$ TQueue m t -> STM m t
forall a. TQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m t
readQueue
          t -> m a
callback t
newMsg
      )
      ((Async m Any -> m b) -> m b) -> (Async m Any -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m Any
_ ->
        Network m msg -> m b
action (Network m msg -> m b) -> Network m msg -> m b
forall a b. (a -> b) -> a -> b
$
          Network
            { $sel:broadcast:Network :: msg -> m ()
broadcast = \msg
m -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                -- drop 2% of messages
                Double
r <- TVar m a -> STM m Double
forall {m :: * -> *} {a}.
(MonadSTM m, RandomGen a) =>
TVar m a -> STM m Double
randomNumber TVar m a
seed
                Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0.02) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$ TQueue m (Authenticated msg) -> Authenticated msg -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (Authenticated msg)
writeQueue (msg -> Party -> Authenticated msg
forall msg. msg -> Party -> Authenticated msg
Authenticated msg
m Party
peer)
            }
  randomNumber :: TVar m a -> STM m Double
randomNumber TVar m a
seed' = do
    a
genSeed <- TVar m a -> STM m a
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m a
seed'
    let (Double
res, a
newGenSeed) = (Double, Double) -> a -> (Double, a)
forall g a. (RandomGen g, UniformRange a) => (a, a) -> g -> (a, g)
uniformR (Double
0 :: Double, Double
1) a
genSeed
    TVar m a -> a -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m a
seed' a
newGenSeed
    Double -> STM m Double
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Double
res

  reloadAll :: FilePath -> IO [Heartbeat (Heartbeat String)]
  reloadAll :: String -> IO [Heartbeat (Heartbeat String)]
reloadAll String
fileName =
    String
-> IO (PersistenceIncremental (Heartbeat (Heartbeat String)) IO)
forall a (m :: * -> *).
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m,
 MonadThrow (STM m)) =>
String -> m (PersistenceIncremental a m)
createPersistenceIncremental String
fileName
      IO (PersistenceIncremental (Heartbeat (Heartbeat String)) IO)
-> (PersistenceIncremental (Heartbeat (Heartbeat String)) IO
    -> IO [Heartbeat (Heartbeat String)])
-> IO [Heartbeat (Heartbeat String)]
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \PersistenceIncremental{FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll :: FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll} -> IO [Heartbeat (Heartbeat String)]
FromJSON (Heartbeat (Heartbeat String)) =>
IO [Heartbeat (Heartbeat String)]
loadAll

noop :: Monad m => b -> m ()
noop :: forall (m :: * -> *) b. Monad m => b -> m ()
noop = m () -> b -> m ()
forall a b. a -> b -> a
const (m () -> b -> m ()) -> m () -> b -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

aliceReceivesMessages :: [Authenticated (ReliableMsg (Heartbeat msg))] -> [Authenticated (Heartbeat msg)]
aliceReceivesMessages :: forall msg.
[Authenticated (ReliableMsg (Heartbeat msg))]
-> [Authenticated (Heartbeat msg)]
aliceReceivesMessages [Authenticated (ReliableMsg (Heartbeat msg))]
messages = (forall s. IOSim s [Authenticated (Heartbeat msg)])
-> [Authenticated (Heartbeat msg)]
forall a. (forall s. IOSim s a) -> a
runSimOrThrow ((forall s. IOSim s [Authenticated (Heartbeat msg)])
 -> [Authenticated (Heartbeat msg)])
-> (forall s. IOSim s [Authenticated (Heartbeat msg)])
-> [Authenticated (Heartbeat msg)]
forall a b. (a -> b) -> a -> b
$ do
  TVar s (Vector (Authenticated (Heartbeat msg)))
receivedMessages <- Vector (Authenticated (Heartbeat msg))
-> IOSim
     s (TVar (IOSim s) (Vector (Authenticated (Heartbeat msg))))
forall a. a -> IOSim s (TVar (IOSim s) a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Vector (Authenticated (Heartbeat msg))
forall a. Vector a
empty
  MessagePersistence (IOSim s) msg
alicePersistence <- Int
-> MonadSTM (IOSim s) => IOSim s (MessagePersistence (IOSim s) msg)
forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
3

  let baseNetwork :: (Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat msg))
    -> IOSim s [()])
-> IOSim s [()]
baseNetwork Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ()
incoming Network (IOSim s) (ReliableMsg (Heartbeat msg)) -> IOSim s [()]
_ = (Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ())
-> [Authenticated (ReliableMsg (Heartbeat msg))] -> IOSim s [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ()
incoming [Authenticated (ReliableMsg (Heartbeat msg))]
messages

      aliceReliabilityStack :: NetworkComponent
  (IOSim s) (Authenticated (Heartbeat msg)) (Heartbeat msg) [()]
aliceReliabilityStack =
        Tracer (IOSim s) ReliabilityLog
-> MessagePersistence (IOSim s) msg
-> Party
-> [Party]
-> ((Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ())
    -> (Network (IOSim s) (ReliableMsg (Heartbeat msg))
        -> IOSim s [()])
    -> IOSim s [()])
-> NetworkComponent
     (IOSim s) (Authenticated (Heartbeat msg)) (Heartbeat msg) [()]
forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability
          Tracer (IOSim s) ReliabilityLog
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
          MessagePersistence (IOSim s) msg
alicePersistence
          Party
alice
          [Party
bob, Party
carol]
          (Authenticated (ReliableMsg (Heartbeat msg)) -> IOSim s ())
-> (Network (IOSim s) (ReliableMsg (Heartbeat msg))
    -> IOSim s [()])
-> IOSim s [()]
baseNetwork

  IOSim s [()] -> IOSim s ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IOSim s [()] -> IOSim s ()) -> IOSim s [()] -> IOSim s ()
forall a b. (a -> b) -> a -> b
$ NetworkComponent
  (IOSim s) (Authenticated (Heartbeat msg)) (Heartbeat msg) [()]
aliceReliabilityStack (TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
-> Authenticated (Heartbeat msg) -> IOSim s ()
forall (m :: * -> *) p.
MonadSTM m =>
TVar m (Vector p) -> p -> m ()
captureIncoming TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
TVar s (Vector (Authenticated (Heartbeat msg)))
receivedMessages) ((Network (IOSim s) (Heartbeat msg) -> IOSim s [()])
 -> IOSim s [()])
-> (Network (IOSim s) (Heartbeat msg) -> IOSim s [()])
-> IOSim s [()]
forall a b. (a -> b) -> a -> b
$ \Network (IOSim s) (Heartbeat msg)
_action ->
    [()] -> IOSim s [()]
forall a. a -> IOSim s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [()]

  Vector (Authenticated (Heartbeat msg))
-> [Authenticated (Heartbeat msg)]
forall a. Vector a -> [a]
Vector.toList (Vector (Authenticated (Heartbeat msg))
 -> [Authenticated (Heartbeat msg)])
-> IOSim s (Vector (Authenticated (Heartbeat msg)))
-> IOSim s [Authenticated (Heartbeat msg)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
-> IOSim s (Vector (Authenticated (Heartbeat msg)))
forall a. TVar (IOSim s) a -> IOSim s a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar (IOSim s) (Vector (Authenticated (Heartbeat msg)))
TVar s (Vector (Authenticated (Heartbeat msg)))
receivedMessages

captureIncoming :: MonadSTM m => TVar m (Vector p) -> p -> m ()
captureIncoming :: forall (m :: * -> *) p.
MonadSTM m =>
TVar m (Vector p) -> p -> m ()
captureIncoming TVar m (Vector p)
receivedMessages p
msg =
  STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector p) -> (Vector p -> Vector p) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (Vector p)
receivedMessages (Vector p -> p -> Vector p
forall a. Vector a -> a -> Vector a
`snoc` p
msg)

capturePayload :: MonadSTM m => TVar m (Vector msg) -> Authenticated (Heartbeat msg) -> m ()
capturePayload :: forall (m :: * -> *) msg.
MonadSTM m =>
TVar m (Vector msg) -> Authenticated (Heartbeat msg) -> m ()
capturePayload TVar m (Vector msg)
receivedMessages Authenticated{Heartbeat msg
$sel:payload:Authenticated :: forall msg. Authenticated msg -> msg
payload :: Heartbeat msg
payload} = case Heartbeat msg
payload of
  Data NodeId
_ msg
msg ->
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (Vector msg) -> (Vector msg -> Vector msg) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (Vector msg)
receivedMessages (Vector msg -> msg -> Vector msg
forall a. Vector a -> a -> Vector a
`snoc` msg
msg)
  Heartbeat msg
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

waitForAllMessages :: MonadSTM m => [msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages :: forall (m :: * -> *) msg.
MonadSTM m =>
[msg] -> TVar m (Vector msg) -> m ()
waitForAllMessages [msg]
expectedMessages TVar m (Vector msg)
capturedMessages = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Vector msg
msgs <- TVar m (Vector msg) -> STM m (Vector msg)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Vector msg)
capturedMessages
  Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ()) -> Bool -> STM m ()
forall a b. (a -> b) -> a -> b
$ Vector msg -> Int
forall a. Vector a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Vector msg
msgs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== [msg] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [msg]
expectedMessages

captureTraces ::
  MonadSTM m =>
  TVar m [ReliabilityLog] ->
  Tracer m ReliabilityLog
captureTraces :: forall (m :: * -> *).
MonadSTM m =>
TVar m [ReliabilityLog] -> Tracer m ReliabilityLog
captureTraces TVar m [ReliabilityLog]
tvar = (ReliabilityLog -> m ()) -> Tracer m ReliabilityLog
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer ((ReliabilityLog -> m ()) -> Tracer m ReliabilityLog)
-> (ReliabilityLog -> m ()) -> Tracer m ReliabilityLog
forall a b. (a -> b) -> a -> b
$ \ReliabilityLog
msg -> do
  STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m [ReliabilityLog]
-> ([ReliabilityLog] -> [ReliabilityLog]) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m [ReliabilityLog]
tvar (ReliabilityLog
msg :)

mockMessagePersistence :: Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence :: forall (m :: * -> *) msg.
Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence Int
numberOfParties = do
  TVar m (Vector Int)
acks <- Vector Int -> m (TVar m (Vector Int))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Vector Int -> m (TVar m (Vector Int)))
-> Vector Int -> m (TVar m (Vector Int))
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Int
forall a. Int -> a -> Vector a
replicate Int
numberOfParties Int
0
  TVar m (StrictSeq (Heartbeat msg))
messages <- StrictSeq (Heartbeat msg) -> m (TVar m (StrictSeq (Heartbeat msg)))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO StrictSeq (Heartbeat msg)
forall a. Monoid a => a
mempty
  MessagePersistence m msg -> m (MessagePersistence m msg)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessagePersistence m msg -> m (MessagePersistence m msg))
-> MessagePersistence m msg -> m (MessagePersistence m msg)
forall a b. (a -> b) -> a -> b
$
    MessagePersistence
      { $sel:loadAcks:MessagePersistence :: m (Vector Int)
loadAcks = TVar m (Vector Int) -> m (Vector Int)
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (Vector Int)
acks
      , $sel:saveAcks:MessagePersistence :: Vector Int -> m ()
saveAcks = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (Vector Int -> STM m ()) -> Vector Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar m (Vector Int) -> Vector Int -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Vector Int)
acks
      , $sel:loadMessages:MessagePersistence :: m [Heartbeat msg]
loadMessages = StrictSeq (Heartbeat msg) -> [Heartbeat msg]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (StrictSeq (Heartbeat msg) -> [Heartbeat msg])
-> m (StrictSeq (Heartbeat msg)) -> m [Heartbeat msg]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m (StrictSeq (Heartbeat msg)) -> m (StrictSeq (Heartbeat msg))
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (StrictSeq (Heartbeat msg))
messages
      , $sel:appendMessage:MessagePersistence :: Heartbeat msg -> m ()
appendMessage = \Heartbeat msg
msg -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m (StrictSeq (Heartbeat msg))
-> (StrictSeq (Heartbeat msg) -> StrictSeq (Heartbeat msg))
-> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m (StrictSeq (Heartbeat msg))
messages (StrictSeq (Heartbeat msg)
-> Heartbeat msg -> StrictSeq (Heartbeat msg)
forall a. StrictSeq a -> a -> StrictSeq a
|> Heartbeat msg
msg)
      }