-- | An implementation of an application-level failure detector.
-- This module exposes a /Component/ 'withHeartbeat' than can be used to
-- wrap another 'NetworkComponent' and piggy-back on it to send and propagate
-- 'Heartbeat' messages and detect other parties' liveness.
--
-- It is inspired by the /Increasing timeout/ algorithms from the book <https://www.distributedprogramming.net/index.shtml Introduction to Reliable and Secure Distributed Programming>
-- by /Cachin et al./ which is an /Eventually Perfect Failure Detector/ suitable for
-- partially synchronous network settings. It has the following behaviour:
--
--  * It broadcasts a 'Ping' to other parties through the underlying 'Network' implementation
--    if the last message has been sent more than 3s ago
--  * When receiving messages from other parties, it records reception time and notifies underlying
--    node with a 'Connected' message
--  * If new messages are received from 'alive' parties before 3s timeout expires no new 'Connected'
--    message is sent
--  * If main thread detects that a formerly 'alive' party has not been seen for more than 3s, it is
--    marked as 'suspected' and a 'Disconnected' message is sent to the node.
module Hydra.Network.Heartbeat where

import Hydra.Prelude

import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO, writeTVar)
import Data.Map qualified as Map
import Data.Set qualified as Set
import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent, NodeId)
import Hydra.Network.Message (Connectivity (Connected, Disconnected))

data HeartbeatState = HeartbeatState
  { HeartbeatState -> Map NodeId Time
alive :: Map NodeId Time
  -- ^ The map of known 'Connected' parties with the last time they've been "seen".
  -- This is updated when we see a message from another node
  , HeartbeatState -> Set NodeId
suspected :: Set NodeId
  -- ^ The set of known parties which might be 'Disconnected'
  -- This is updated after some time no message has been received from a node.
  }
  deriving stock (HeartbeatState -> HeartbeatState -> Bool
(HeartbeatState -> HeartbeatState -> Bool)
-> (HeartbeatState -> HeartbeatState -> Bool) -> Eq HeartbeatState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: HeartbeatState -> HeartbeatState -> Bool
== :: HeartbeatState -> HeartbeatState -> Bool
$c/= :: HeartbeatState -> HeartbeatState -> Bool
/= :: HeartbeatState -> HeartbeatState -> Bool
Eq)

initialHeartbeatState :: HeartbeatState
initialHeartbeatState :: HeartbeatState
initialHeartbeatState = HeartbeatState{alive :: Map NodeId Time
alive = Map NodeId Time
forall a. Monoid a => a
mempty, suspected :: Set NodeId
suspected = Set NodeId
forall a. Monoid a => a
mempty}

data Heartbeat msg
  = Data NodeId msg
  | Ping NodeId
  deriving stock (Heartbeat msg -> Heartbeat msg -> Bool
(Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool) -> Eq (Heartbeat msg)
forall msg. Eq msg => Heartbeat msg -> Heartbeat msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall msg. Eq msg => Heartbeat msg -> Heartbeat msg -> Bool
== :: Heartbeat msg -> Heartbeat msg -> Bool
$c/= :: forall msg. Eq msg => Heartbeat msg -> Heartbeat msg -> Bool
/= :: Heartbeat msg -> Heartbeat msg -> Bool
Eq, Int -> Heartbeat msg -> ShowS
[Heartbeat msg] -> ShowS
Heartbeat msg -> String
(Int -> Heartbeat msg -> ShowS)
-> (Heartbeat msg -> String)
-> ([Heartbeat msg] -> ShowS)
-> Show (Heartbeat msg)
forall msg. Show msg => Int -> Heartbeat msg -> ShowS
forall msg. Show msg => [Heartbeat msg] -> ShowS
forall msg. Show msg => Heartbeat msg -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall msg. Show msg => Int -> Heartbeat msg -> ShowS
showsPrec :: Int -> Heartbeat msg -> ShowS
$cshow :: forall msg. Show msg => Heartbeat msg -> String
show :: Heartbeat msg -> String
$cshowList :: forall msg. Show msg => [Heartbeat msg] -> ShowS
showList :: [Heartbeat msg] -> ShowS
Show, Eq (Heartbeat msg)
Eq (Heartbeat msg) =>
(Heartbeat msg -> Heartbeat msg -> Ordering)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Heartbeat msg)
-> (Heartbeat msg -> Heartbeat msg -> Heartbeat msg)
-> Ord (Heartbeat msg)
Heartbeat msg -> Heartbeat msg -> Bool
Heartbeat msg -> Heartbeat msg -> Ordering
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
forall msg. Ord msg => Eq (Heartbeat msg)
forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Ordering
forall msg.
Ord msg =>
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
$ccompare :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Ordering
compare :: Heartbeat msg -> Heartbeat msg -> Ordering
$c< :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
< :: Heartbeat msg -> Heartbeat msg -> Bool
$c<= :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
<= :: Heartbeat msg -> Heartbeat msg -> Bool
$c> :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
> :: Heartbeat msg -> Heartbeat msg -> Bool
$c>= :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
>= :: Heartbeat msg -> Heartbeat msg -> Bool
$cmax :: forall msg.
Ord msg =>
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
max :: Heartbeat msg -> Heartbeat msg -> Heartbeat msg
$cmin :: forall msg.
Ord msg =>
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
min :: Heartbeat msg -> Heartbeat msg -> Heartbeat msg
Ord, (forall x. Heartbeat msg -> Rep (Heartbeat msg) x)
-> (forall x. Rep (Heartbeat msg) x -> Heartbeat msg)
-> Generic (Heartbeat msg)
forall x. Rep (Heartbeat msg) x -> Heartbeat msg
forall x. Heartbeat msg -> Rep (Heartbeat msg) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall msg x. Rep (Heartbeat msg) x -> Heartbeat msg
forall msg x. Heartbeat msg -> Rep (Heartbeat msg) x
$cfrom :: forall msg x. Heartbeat msg -> Rep (Heartbeat msg) x
from :: forall x. Heartbeat msg -> Rep (Heartbeat msg) x
$cto :: forall msg x. Rep (Heartbeat msg) x -> Heartbeat msg
to :: forall x. Rep (Heartbeat msg) x -> Heartbeat msg
Generic)
  deriving anyclass ([Heartbeat msg] -> Value
[Heartbeat msg] -> Encoding
Heartbeat msg -> Bool
Heartbeat msg -> Value
Heartbeat msg -> Encoding
(Heartbeat msg -> Value)
-> (Heartbeat msg -> Encoding)
-> ([Heartbeat msg] -> Value)
-> ([Heartbeat msg] -> Encoding)
-> (Heartbeat msg -> Bool)
-> ToJSON (Heartbeat msg)
forall msg. ToJSON msg => [Heartbeat msg] -> Value
forall msg. ToJSON msg => [Heartbeat msg] -> Encoding
forall msg. ToJSON msg => Heartbeat msg -> Bool
forall msg. ToJSON msg => Heartbeat msg -> Value
forall msg. ToJSON msg => Heartbeat msg -> Encoding
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: forall msg. ToJSON msg => Heartbeat msg -> Value
toJSON :: Heartbeat msg -> Value
$ctoEncoding :: forall msg. ToJSON msg => Heartbeat msg -> Encoding
toEncoding :: Heartbeat msg -> Encoding
$ctoJSONList :: forall msg. ToJSON msg => [Heartbeat msg] -> Value
toJSONList :: [Heartbeat msg] -> Value
$ctoEncodingList :: forall msg. ToJSON msg => [Heartbeat msg] -> Encoding
toEncodingList :: [Heartbeat msg] -> Encoding
$comitField :: forall msg. ToJSON msg => Heartbeat msg -> Bool
omitField :: Heartbeat msg -> Bool
ToJSON, Maybe (Heartbeat msg)
Value -> Parser [Heartbeat msg]
Value -> Parser (Heartbeat msg)
(Value -> Parser (Heartbeat msg))
-> (Value -> Parser [Heartbeat msg])
-> Maybe (Heartbeat msg)
-> FromJSON (Heartbeat msg)
forall msg. FromJSON msg => Maybe (Heartbeat msg)
forall msg. FromJSON msg => Value -> Parser [Heartbeat msg]
forall msg. FromJSON msg => Value -> Parser (Heartbeat msg)
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: forall msg. FromJSON msg => Value -> Parser (Heartbeat msg)
parseJSON :: Value -> Parser (Heartbeat msg)
$cparseJSONList :: forall msg. FromJSON msg => Value -> Parser [Heartbeat msg]
parseJSONList :: Value -> Parser [Heartbeat msg]
$comittedField :: forall msg. FromJSON msg => Maybe (Heartbeat msg)
omittedField :: Maybe (Heartbeat msg)
FromJSON)

instance ToCBOR msg => ToCBOR (Heartbeat msg) where
  toCBOR :: Heartbeat msg -> Encoding
toCBOR = \case
    (Data NodeId
host msg
hmsg) -> Int -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR (Int
0 :: Int) Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> NodeId -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR NodeId
host Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> msg -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR msg
hmsg
    (Ping NodeId
host) -> Int -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR (Int
1 :: Int) Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> NodeId -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR NodeId
host

instance FromCBOR msg => FromCBOR (Heartbeat msg) where
  fromCBOR :: forall s. Decoder s (Heartbeat msg)
fromCBOR =
    Decoder s Int
forall s. Decoder s Int
forall a s. FromCBOR a => Decoder s a
fromCBOR Decoder s Int
-> (Int -> Decoder s (Heartbeat msg)) -> Decoder s (Heartbeat msg)
forall a b. Decoder s a -> (a -> Decoder s b) -> Decoder s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      (Int
0 :: Int) -> NodeId -> msg -> Heartbeat msg
forall msg. NodeId -> msg -> Heartbeat msg
Data (NodeId -> msg -> Heartbeat msg)
-> Decoder s NodeId -> Decoder s (msg -> Heartbeat msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Decoder s NodeId
forall s. Decoder s NodeId
forall a s. FromCBOR a => Decoder s a
fromCBOR Decoder s (msg -> Heartbeat msg)
-> Decoder s msg -> Decoder s (Heartbeat msg)
forall a b. Decoder s (a -> b) -> Decoder s a -> Decoder s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Decoder s msg
forall s. Decoder s msg
forall a s. FromCBOR a => Decoder s a
fromCBOR
      Int
1 -> NodeId -> Heartbeat msg
forall msg. NodeId -> Heartbeat msg
Ping (NodeId -> Heartbeat msg)
-> Decoder s NodeId -> Decoder s (Heartbeat msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Decoder s NodeId
forall s. Decoder s NodeId
forall a s. FromCBOR a => Decoder s a
fromCBOR
      Int
other -> String -> Decoder s (Heartbeat msg)
forall a. String -> Decoder s a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Decoder s (Heartbeat msg))
-> String -> Decoder s (Heartbeat msg)
forall a b. (a -> b) -> a -> b
$ String
"Unknown tag " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall b a. (Show a, IsString b) => a -> b
show Int
other String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" trying to deserialise value to Heartbeat"

instance ToCBOR msg => SignableRepresentation (Heartbeat msg) where
  getSignableRepresentation :: Heartbeat msg -> ByteString
getSignableRepresentation = Heartbeat msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize'

isPing :: Heartbeat msg -> Bool
isPing :: forall msg. Heartbeat msg -> Bool
isPing = \case
  Ping{} -> Bool
True
  Heartbeat msg
_ -> Bool
False

-- | Delay between each heartbeat check.
--
-- NOTE: This could be made configurable.
heartbeatDelay :: DiffTime
heartbeatDelay :: DiffTime
heartbeatDelay = DiffTime
0.5

-- | Maximal delay between expected and sent heartbeats.
--
-- NOTE: This could be made configurable.
livenessDelay :: DiffTime
livenessDelay :: DiffTime
livenessDelay = DiffTime
3

-- | Wrap a lower-level `NetworkComponent` and handle sending/receiving of heartbeats.
--
-- Note that the type of consumed and sent messages can be different.
withHeartbeat ::
  ( MonadAsync m
  , MonadDelay m
  ) =>
  -- | This node's id, used to identify `Heartbeat` messages broadcast to peers.
  NodeId ->
  -- | Underlying `NetworkComponent` for sending and consuming `Heartbeat` messages.
  NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a ->
  -- | Returns a network component that can be used to send and consume arbitrary messages.
  -- This layer will take care of peeling out/wrapping messages into `Heartbeat`s.
  NetworkComponent m (Either Connectivity inbound) outbound a
withHeartbeat :: forall (m :: * -> *) inbound outbound a.
(MonadAsync m, MonadDelay m) =>
NodeId
-> NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a
-> NetworkComponent m (Either Connectivity inbound) outbound a
withHeartbeat NodeId
nodeId NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a
withNetwork NetworkCallback (Either Connectivity inbound) m
callback Network m outbound -> m a
action = do
  TVar m HeartbeatState
heartbeat <- HeartbeatState -> m (TVar m HeartbeatState)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO HeartbeatState
initialHeartbeatState
  TVar m (Maybe Time)
lastSent <- Maybe Time -> m (TVar m (Maybe Time))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Maybe Time
forall a. Maybe a
Nothing
  NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a
withNetwork (TVar m HeartbeatState
-> NetworkCallback (Either Connectivity inbound) m
-> NetworkCallback (Heartbeat inbound) m
forall (m :: * -> *) inbound.
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState
-> NetworkCallback (Either Connectivity inbound) m
-> NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages TVar m HeartbeatState
heartbeat NetworkCallback (Either Connectivity inbound) m
callback) ((Network m (Heartbeat outbound) -> m a) -> m a)
-> (Network m (Heartbeat outbound) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Network m (Heartbeat outbound)
network ->
    m () -> (Async m () -> m a) -> m a
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (TVar m HeartbeatState -> (Connectivity -> m ()) -> m ()
forall (m :: * -> *).
(MonadDelay m, MonadSTM m) =>
TVar m HeartbeatState -> (Connectivity -> m ()) -> m ()
checkRemoteParties TVar m HeartbeatState
heartbeat Connectivity -> m ()
onConnectivityChanged) ((Async m () -> m a) -> m a) -> (Async m () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m ()
_ ->
      m () -> (Async m () -> m a) -> m a
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (NodeId
-> TVar m (Maybe Time) -> Network m (Heartbeat outbound) -> m ()
forall (m :: * -> *) outbound.
(MonadDelay m, MonadSTM m) =>
NodeId
-> TVar m (Maybe Time) -> Network m (Heartbeat outbound) -> m ()
checkHeartbeatState NodeId
nodeId TVar m (Maybe Time)
lastSent Network m (Heartbeat outbound)
network) ((Async m () -> m a) -> m a) -> (Async m () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m ()
_ ->
        Network m outbound -> m a
action (NodeId
-> TVar m (Maybe Time)
-> Network m (Heartbeat outbound)
-> Network m outbound
forall (m :: * -> *) outbound.
(MonadSTM m, MonadMonotonicTime m) =>
NodeId
-> TVar m (Maybe Time)
-> Network m (Heartbeat outbound)
-> Network m outbound
updateStateFromOutgoingMessages NodeId
nodeId TVar m (Maybe Time)
lastSent Network m (Heartbeat outbound)
network)
 where
  NetworkCallback{Either Connectivity inbound -> m ()
deliver :: Either Connectivity inbound -> m ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} = NetworkCallback (Either Connectivity inbound) m
callback

  onConnectivityChanged :: Connectivity -> m ()
onConnectivityChanged = Either Connectivity inbound -> m ()
deliver (Either Connectivity inbound -> m ())
-> (Connectivity -> Either Connectivity inbound)
-> Connectivity
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connectivity -> Either Connectivity inbound
forall a b. a -> Either a b
Left

updateStateFromIncomingMessages ::
  (MonadSTM m, MonadMonotonicTime m) =>
  TVar m HeartbeatState ->
  NetworkCallback (Either Connectivity inbound) m ->
  NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages :: forall (m :: * -> *) inbound.
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState
-> NetworkCallback (Either Connectivity inbound) m
-> NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages TVar m HeartbeatState
heartbeatState NetworkCallback (Either Connectivity inbound) m
callback =
  NetworkCallback
    { $sel:deliver:NetworkCallback :: Heartbeat inbound -> m ()
deliver = \case
        Data NodeId
nodeId inbound
msg -> NodeId -> m ()
notifyAlive NodeId
nodeId m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Connectivity inbound -> m ()
deliver (inbound -> Either Connectivity inbound
forall a b. b -> Either a b
Right inbound
msg)
        Ping NodeId
nodeId -> NodeId -> m ()
notifyAlive NodeId
nodeId
    }
 where
  NetworkCallback{Either Connectivity inbound -> m ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver :: Either Connectivity inbound -> m ()
deliver} = NetworkCallback (Either Connectivity inbound) m
callback

  notifyAlive :: NodeId -> m ()
notifyAlive NodeId
peer = do
    Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    Map NodeId Time
aliveSet <- HeartbeatState -> Map NodeId Time
alive (HeartbeatState -> Map NodeId Time)
-> m HeartbeatState -> m (Map NodeId Time)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m HeartbeatState -> m HeartbeatState
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m HeartbeatState
heartbeatState
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (NodeId
peer NodeId -> Map NodeId Time -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map NodeId Time
aliveSet) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either Connectivity inbound -> m ()
deliver (Connectivity -> Either Connectivity inbound
forall a b. a -> Either a b
Left (Connectivity -> Either Connectivity inbound)
-> Connectivity -> Either Connectivity inbound
forall a b. (a -> b) -> a -> b
$ NodeId -> Connectivity
Connected NodeId
peer)
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
      TVar m HeartbeatState
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m HeartbeatState
heartbeatState ((HeartbeatState -> HeartbeatState) -> STM m ())
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \HeartbeatState
s ->
        HeartbeatState
s
          { alive = Map.insert peer now (alive s)
          , suspected = peer `Set.delete` suspected s
          }

updateStateFromOutgoingMessages ::
  (MonadSTM m, MonadMonotonicTime m) =>
  NodeId ->
  TVar m (Maybe Time) ->
  Network m (Heartbeat outbound) ->
  Network m outbound
updateStateFromOutgoingMessages :: forall (m :: * -> *) outbound.
(MonadSTM m, MonadMonotonicTime m) =>
NodeId
-> TVar m (Maybe Time)
-> Network m (Heartbeat outbound)
-> Network m outbound
updateStateFromOutgoingMessages NodeId
nodeId TVar m (Maybe Time)
lastSent Network{Heartbeat outbound -> m ()
broadcast :: Heartbeat outbound -> m ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast} =
  (outbound -> m ()) -> Network m outbound
forall (m :: * -> *) msg. (msg -> m ()) -> Network m msg
Network ((outbound -> m ()) -> Network m outbound)
-> (outbound -> m ()) -> Network m outbound
forall a b. (a -> b) -> a -> b
$ \outbound
msg -> do
    Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    TVar m (Maybe Time) -> Time -> m ()
forall (m :: * -> *).
MonadSTM m =>
TVar m (Maybe Time) -> Time -> m ()
updateLastSent TVar m (Maybe Time)
lastSent Time
now
    Heartbeat outbound -> m ()
broadcast (NodeId -> outbound -> Heartbeat outbound
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
nodeId outbound
msg)

updateLastSent :: MonadSTM m => TVar m (Maybe Time) -> Time -> m ()
updateLastSent :: forall (m :: * -> *).
MonadSTM m =>
TVar m (Maybe Time) -> Time -> m ()
updateLastSent TVar m (Maybe Time)
lastSent Time
now = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TVar m (Maybe Time) -> Maybe Time -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Maybe Time)
lastSent (Time -> Maybe Time
forall a. a -> Maybe a
Just Time
now))

checkHeartbeatState ::
  ( MonadDelay m
  , MonadSTM m
  ) =>
  NodeId ->
  TVar m (Maybe Time) ->
  Network m (Heartbeat outbound) ->
  m ()
checkHeartbeatState :: forall (m :: * -> *) outbound.
(MonadDelay m, MonadSTM m) =>
NodeId
-> TVar m (Maybe Time) -> Network m (Heartbeat outbound) -> m ()
checkHeartbeatState NodeId
nodeId TVar m (Maybe Time)
lastSent Network{Heartbeat outbound -> m ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat outbound -> m ()
broadcast} =
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
heartbeatDelay
    Maybe Time
st <- TVar m (Maybe Time) -> m (Maybe Time)
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (Maybe Time)
lastSent
    Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Time -> Maybe Time -> Bool
shouldSendHeartbeat Time
now Maybe Time
st) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      TVar m (Maybe Time) -> Time -> m ()
forall (m :: * -> *).
MonadSTM m =>
TVar m (Maybe Time) -> Time -> m ()
updateLastSent TVar m (Maybe Time)
lastSent Time
now
      Heartbeat outbound -> m ()
broadcast (NodeId -> Heartbeat outbound
forall msg. NodeId -> Heartbeat msg
Ping NodeId
nodeId)

shouldSendHeartbeat :: Time -> Maybe Time -> Bool
shouldSendHeartbeat :: Time -> Maybe Time -> Bool
shouldSendHeartbeat Time
now = Bool -> (Time -> Bool) -> Maybe Time -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
True ((DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout DiffTime -> DiffTime
forall a. a -> a
id Time
now)

checkRemoteParties ::
  ( MonadDelay m
  , MonadSTM m
  ) =>
  TVar m HeartbeatState ->
  (Connectivity -> m ()) ->
  m ()
checkRemoteParties :: forall (m :: * -> *).
(MonadDelay m, MonadSTM m) =>
TVar m HeartbeatState -> (Connectivity -> m ()) -> m ()
checkRemoteParties TVar m HeartbeatState
heartbeatState Connectivity -> m ()
onConnectivity =
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
heartbeatDelay DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* DiffTime
2)
    Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    TVar m HeartbeatState -> Time -> m (Set NodeId)
forall (m :: * -> *).
MonadSTM m =>
TVar m HeartbeatState -> Time -> m (Set NodeId)
updateSuspected TVar m HeartbeatState
heartbeatState Time
now
      m (Set NodeId) -> (Set NodeId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (NodeId -> m ()) -> Set NodeId -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Connectivity -> m ()
onConnectivity (Connectivity -> m ())
-> (NodeId -> Connectivity) -> NodeId -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Connectivity
Disconnected)

updateSuspected :: MonadSTM m => TVar m HeartbeatState -> Time -> m (Set NodeId)
updateSuspected :: forall (m :: * -> *).
MonadSTM m =>
TVar m HeartbeatState -> Time -> m (Set NodeId)
updateSuspected TVar m HeartbeatState
heartbeatState Time
now =
  STM m (Set NodeId) -> m (Set NodeId)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Set NodeId) -> m (Set NodeId))
-> STM m (Set NodeId) -> m (Set NodeId)
forall a b. (a -> b) -> a -> b
$ do
    Map NodeId Time
aliveParties <- HeartbeatState -> Map NodeId Time
alive (HeartbeatState -> Map NodeId Time)
-> STM m HeartbeatState -> STM m (Map NodeId Time)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m HeartbeatState -> STM m HeartbeatState
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m HeartbeatState
heartbeatState
    let timedOutParties :: Map NodeId Time
timedOutParties = (Time -> Bool) -> Map NodeId Time -> Map NodeId Time
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter ((DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout (DiffTime
2 *) Time
now) Map NodeId Time
aliveParties
    Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Map NodeId Time -> Bool
forall k a. Map k a -> Bool
Map.null Map NodeId Time
timedOutParties) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
      TVar m HeartbeatState
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m HeartbeatState
heartbeatState ((HeartbeatState -> HeartbeatState) -> STM m ())
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \HeartbeatState
s ->
        HeartbeatState
s
          { suspected = suspected s <> Map.keysSet timedOutParties
          , alive = aliveParties `Map.difference` timedOutParties
          }
    Set NodeId -> STM m (Set NodeId)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Set NodeId -> STM m (Set NodeId))
-> Set NodeId -> STM m (Set NodeId)
forall a b. (a -> b) -> a -> b
$ Map NodeId Time -> Set NodeId
forall k a. Map k a -> Set k
Map.keysSet Map NodeId Time
timedOutParties

checkTimeout :: (DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout :: (DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout DiffTime -> DiffTime
delayFn Time
now Time
seen = Time -> Time -> DiffTime
diffTime Time
now Time
seen DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> DiffTime -> DiffTime
delayFn DiffTime
livenessDelay