module Hydra.Network.Heartbeat where
import Hydra.Prelude
import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO, writeTVar)
import Data.Map qualified as Map
import Data.Set qualified as Set
import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent, NodeId)
import Hydra.Network.Message (Connectivity (Connected, Disconnected))
data HeartbeatState = HeartbeatState
{ HeartbeatState -> Map NodeId Time
alive :: Map NodeId Time
, HeartbeatState -> Set NodeId
suspected :: Set NodeId
}
deriving stock (HeartbeatState -> HeartbeatState -> Bool
(HeartbeatState -> HeartbeatState -> Bool)
-> (HeartbeatState -> HeartbeatState -> Bool) -> Eq HeartbeatState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: HeartbeatState -> HeartbeatState -> Bool
== :: HeartbeatState -> HeartbeatState -> Bool
$c/= :: HeartbeatState -> HeartbeatState -> Bool
/= :: HeartbeatState -> HeartbeatState -> Bool
Eq)
initialHeartbeatState :: HeartbeatState
initialHeartbeatState :: HeartbeatState
initialHeartbeatState = HeartbeatState{alive :: Map NodeId Time
alive = Map NodeId Time
forall a. Monoid a => a
mempty, suspected :: Set NodeId
suspected = Set NodeId
forall a. Monoid a => a
mempty}
data Heartbeat msg
= Data NodeId msg
| Ping NodeId
deriving stock (Heartbeat msg -> Heartbeat msg -> Bool
(Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool) -> Eq (Heartbeat msg)
forall msg. Eq msg => Heartbeat msg -> Heartbeat msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall msg. Eq msg => Heartbeat msg -> Heartbeat msg -> Bool
== :: Heartbeat msg -> Heartbeat msg -> Bool
$c/= :: forall msg. Eq msg => Heartbeat msg -> Heartbeat msg -> Bool
/= :: Heartbeat msg -> Heartbeat msg -> Bool
Eq, Int -> Heartbeat msg -> ShowS
[Heartbeat msg] -> ShowS
Heartbeat msg -> String
(Int -> Heartbeat msg -> ShowS)
-> (Heartbeat msg -> String)
-> ([Heartbeat msg] -> ShowS)
-> Show (Heartbeat msg)
forall msg. Show msg => Int -> Heartbeat msg -> ShowS
forall msg. Show msg => [Heartbeat msg] -> ShowS
forall msg. Show msg => Heartbeat msg -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall msg. Show msg => Int -> Heartbeat msg -> ShowS
showsPrec :: Int -> Heartbeat msg -> ShowS
$cshow :: forall msg. Show msg => Heartbeat msg -> String
show :: Heartbeat msg -> String
$cshowList :: forall msg. Show msg => [Heartbeat msg] -> ShowS
showList :: [Heartbeat msg] -> ShowS
Show, Eq (Heartbeat msg)
Eq (Heartbeat msg) =>
(Heartbeat msg -> Heartbeat msg -> Ordering)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Bool)
-> (Heartbeat msg -> Heartbeat msg -> Heartbeat msg)
-> (Heartbeat msg -> Heartbeat msg -> Heartbeat msg)
-> Ord (Heartbeat msg)
Heartbeat msg -> Heartbeat msg -> Bool
Heartbeat msg -> Heartbeat msg -> Ordering
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
forall msg. Ord msg => Eq (Heartbeat msg)
forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Ordering
forall msg.
Ord msg =>
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
$ccompare :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Ordering
compare :: Heartbeat msg -> Heartbeat msg -> Ordering
$c< :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
< :: Heartbeat msg -> Heartbeat msg -> Bool
$c<= :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
<= :: Heartbeat msg -> Heartbeat msg -> Bool
$c> :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
> :: Heartbeat msg -> Heartbeat msg -> Bool
$c>= :: forall msg. Ord msg => Heartbeat msg -> Heartbeat msg -> Bool
>= :: Heartbeat msg -> Heartbeat msg -> Bool
$cmax :: forall msg.
Ord msg =>
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
max :: Heartbeat msg -> Heartbeat msg -> Heartbeat msg
$cmin :: forall msg.
Ord msg =>
Heartbeat msg -> Heartbeat msg -> Heartbeat msg
min :: Heartbeat msg -> Heartbeat msg -> Heartbeat msg
Ord, (forall x. Heartbeat msg -> Rep (Heartbeat msg) x)
-> (forall x. Rep (Heartbeat msg) x -> Heartbeat msg)
-> Generic (Heartbeat msg)
forall x. Rep (Heartbeat msg) x -> Heartbeat msg
forall x. Heartbeat msg -> Rep (Heartbeat msg) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall msg x. Rep (Heartbeat msg) x -> Heartbeat msg
forall msg x. Heartbeat msg -> Rep (Heartbeat msg) x
$cfrom :: forall msg x. Heartbeat msg -> Rep (Heartbeat msg) x
from :: forall x. Heartbeat msg -> Rep (Heartbeat msg) x
$cto :: forall msg x. Rep (Heartbeat msg) x -> Heartbeat msg
to :: forall x. Rep (Heartbeat msg) x -> Heartbeat msg
Generic)
deriving anyclass ([Heartbeat msg] -> Value
[Heartbeat msg] -> Encoding
Heartbeat msg -> Bool
Heartbeat msg -> Value
Heartbeat msg -> Encoding
(Heartbeat msg -> Value)
-> (Heartbeat msg -> Encoding)
-> ([Heartbeat msg] -> Value)
-> ([Heartbeat msg] -> Encoding)
-> (Heartbeat msg -> Bool)
-> ToJSON (Heartbeat msg)
forall msg. ToJSON msg => [Heartbeat msg] -> Value
forall msg. ToJSON msg => [Heartbeat msg] -> Encoding
forall msg. ToJSON msg => Heartbeat msg -> Bool
forall msg. ToJSON msg => Heartbeat msg -> Value
forall msg. ToJSON msg => Heartbeat msg -> Encoding
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: forall msg. ToJSON msg => Heartbeat msg -> Value
toJSON :: Heartbeat msg -> Value
$ctoEncoding :: forall msg. ToJSON msg => Heartbeat msg -> Encoding
toEncoding :: Heartbeat msg -> Encoding
$ctoJSONList :: forall msg. ToJSON msg => [Heartbeat msg] -> Value
toJSONList :: [Heartbeat msg] -> Value
$ctoEncodingList :: forall msg. ToJSON msg => [Heartbeat msg] -> Encoding
toEncodingList :: [Heartbeat msg] -> Encoding
$comitField :: forall msg. ToJSON msg => Heartbeat msg -> Bool
omitField :: Heartbeat msg -> Bool
ToJSON, Maybe (Heartbeat msg)
Value -> Parser [Heartbeat msg]
Value -> Parser (Heartbeat msg)
(Value -> Parser (Heartbeat msg))
-> (Value -> Parser [Heartbeat msg])
-> Maybe (Heartbeat msg)
-> FromJSON (Heartbeat msg)
forall msg. FromJSON msg => Maybe (Heartbeat msg)
forall msg. FromJSON msg => Value -> Parser [Heartbeat msg]
forall msg. FromJSON msg => Value -> Parser (Heartbeat msg)
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: forall msg. FromJSON msg => Value -> Parser (Heartbeat msg)
parseJSON :: Value -> Parser (Heartbeat msg)
$cparseJSONList :: forall msg. FromJSON msg => Value -> Parser [Heartbeat msg]
parseJSONList :: Value -> Parser [Heartbeat msg]
$comittedField :: forall msg. FromJSON msg => Maybe (Heartbeat msg)
omittedField :: Maybe (Heartbeat msg)
FromJSON)
instance ToCBOR msg => ToCBOR (Heartbeat msg) where
toCBOR :: Heartbeat msg -> Encoding
toCBOR = \case
(Data NodeId
host msg
hmsg) -> Int -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR (Int
0 :: Int) Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> NodeId -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR NodeId
host Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> msg -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR msg
hmsg
(Ping NodeId
host) -> Int -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR (Int
1 :: Int) Encoding -> Encoding -> Encoding
forall a. Semigroup a => a -> a -> a
<> NodeId -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR NodeId
host
instance FromCBOR msg => FromCBOR (Heartbeat msg) where
fromCBOR :: forall s. Decoder s (Heartbeat msg)
fromCBOR =
Decoder s Int
forall s. Decoder s Int
forall a s. FromCBOR a => Decoder s a
fromCBOR Decoder s Int
-> (Int -> Decoder s (Heartbeat msg)) -> Decoder s (Heartbeat msg)
forall a b. Decoder s a -> (a -> Decoder s b) -> Decoder s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Int
0 :: Int) -> NodeId -> msg -> Heartbeat msg
forall msg. NodeId -> msg -> Heartbeat msg
Data (NodeId -> msg -> Heartbeat msg)
-> Decoder s NodeId -> Decoder s (msg -> Heartbeat msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Decoder s NodeId
forall s. Decoder s NodeId
forall a s. FromCBOR a => Decoder s a
fromCBOR Decoder s (msg -> Heartbeat msg)
-> Decoder s msg -> Decoder s (Heartbeat msg)
forall a b. Decoder s (a -> b) -> Decoder s a -> Decoder s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Decoder s msg
forall s. Decoder s msg
forall a s. FromCBOR a => Decoder s a
fromCBOR
Int
1 -> NodeId -> Heartbeat msg
forall msg. NodeId -> Heartbeat msg
Ping (NodeId -> Heartbeat msg)
-> Decoder s NodeId -> Decoder s (Heartbeat msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Decoder s NodeId
forall s. Decoder s NodeId
forall a s. FromCBOR a => Decoder s a
fromCBOR
Int
other -> String -> Decoder s (Heartbeat msg)
forall a. String -> Decoder s a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Decoder s (Heartbeat msg))
-> String -> Decoder s (Heartbeat msg)
forall a b. (a -> b) -> a -> b
$ String
"Unknown tag " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall b a. (Show a, IsString b) => a -> b
show Int
other String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" trying to deserialise value to Heartbeat"
instance ToCBOR msg => SignableRepresentation (Heartbeat msg) where
getSignableRepresentation :: Heartbeat msg -> ByteString
getSignableRepresentation = Heartbeat msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize'
isPing :: Heartbeat msg -> Bool
isPing :: forall msg. Heartbeat msg -> Bool
isPing = \case
Ping{} -> Bool
True
Heartbeat msg
_ -> Bool
False
heartbeatDelay :: DiffTime
heartbeatDelay :: DiffTime
heartbeatDelay = DiffTime
0.5
livenessDelay :: DiffTime
livenessDelay :: DiffTime
livenessDelay = DiffTime
3
withHeartbeat ::
( MonadAsync m
, MonadDelay m
) =>
NodeId ->
NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a ->
NetworkComponent m (Either Connectivity inbound) outbound a
withHeartbeat :: forall (m :: * -> *) inbound outbound a.
(MonadAsync m, MonadDelay m) =>
NodeId
-> NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a
-> NetworkComponent m (Either Connectivity inbound) outbound a
withHeartbeat NodeId
nodeId NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a
withNetwork NetworkCallback (Either Connectivity inbound) m
callback Network m outbound -> m a
action = do
TVar m HeartbeatState
heartbeat <- HeartbeatState -> m (TVar m HeartbeatState)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO HeartbeatState
initialHeartbeatState
TVar m (Maybe Time)
lastSent <- Maybe Time -> m (TVar m (Maybe Time))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Maybe Time
forall a. Maybe a
Nothing
NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a
withNetwork (TVar m HeartbeatState
-> NetworkCallback (Either Connectivity inbound) m
-> NetworkCallback (Heartbeat inbound) m
forall (m :: * -> *) inbound.
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState
-> NetworkCallback (Either Connectivity inbound) m
-> NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages TVar m HeartbeatState
heartbeat NetworkCallback (Either Connectivity inbound) m
callback) ((Network m (Heartbeat outbound) -> m a) -> m a)
-> (Network m (Heartbeat outbound) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Network m (Heartbeat outbound)
network ->
m () -> (Async m () -> m a) -> m a
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (TVar m HeartbeatState -> (Connectivity -> m ()) -> m ()
forall (m :: * -> *).
(MonadDelay m, MonadSTM m) =>
TVar m HeartbeatState -> (Connectivity -> m ()) -> m ()
checkRemoteParties TVar m HeartbeatState
heartbeat Connectivity -> m ()
onConnectivityChanged) ((Async m () -> m a) -> m a) -> (Async m () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m ()
_ ->
m () -> (Async m () -> m a) -> m a
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (NodeId
-> TVar m (Maybe Time) -> Network m (Heartbeat outbound) -> m ()
forall (m :: * -> *) outbound.
(MonadDelay m, MonadSTM m) =>
NodeId
-> TVar m (Maybe Time) -> Network m (Heartbeat outbound) -> m ()
checkHeartbeatState NodeId
nodeId TVar m (Maybe Time)
lastSent Network m (Heartbeat outbound)
network) ((Async m () -> m a) -> m a) -> (Async m () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m ()
_ ->
Network m outbound -> m a
action (NodeId
-> TVar m (Maybe Time)
-> Network m (Heartbeat outbound)
-> Network m outbound
forall (m :: * -> *) outbound.
(MonadSTM m, MonadMonotonicTime m) =>
NodeId
-> TVar m (Maybe Time)
-> Network m (Heartbeat outbound)
-> Network m outbound
updateStateFromOutgoingMessages NodeId
nodeId TVar m (Maybe Time)
lastSent Network m (Heartbeat outbound)
network)
where
NetworkCallback{Either Connectivity inbound -> m ()
deliver :: Either Connectivity inbound -> m ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} = NetworkCallback (Either Connectivity inbound) m
callback
onConnectivityChanged :: Connectivity -> m ()
onConnectivityChanged = Either Connectivity inbound -> m ()
deliver (Either Connectivity inbound -> m ())
-> (Connectivity -> Either Connectivity inbound)
-> Connectivity
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connectivity -> Either Connectivity inbound
forall a b. a -> Either a b
Left
updateStateFromIncomingMessages ::
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState ->
NetworkCallback (Either Connectivity inbound) m ->
NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages :: forall (m :: * -> *) inbound.
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState
-> NetworkCallback (Either Connectivity inbound) m
-> NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages TVar m HeartbeatState
heartbeatState NetworkCallback (Either Connectivity inbound) m
callback =
NetworkCallback
{ $sel:deliver:NetworkCallback :: Heartbeat inbound -> m ()
deliver = \case
Data NodeId
nodeId inbound
msg -> NodeId -> m ()
notifyAlive NodeId
nodeId m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either Connectivity inbound -> m ()
deliver (inbound -> Either Connectivity inbound
forall a b. b -> Either a b
Right inbound
msg)
Ping NodeId
nodeId -> NodeId -> m ()
notifyAlive NodeId
nodeId
}
where
NetworkCallback{Either Connectivity inbound -> m ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver :: Either Connectivity inbound -> m ()
deliver} = NetworkCallback (Either Connectivity inbound) m
callback
notifyAlive :: NodeId -> m ()
notifyAlive NodeId
peer = do
Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
Map NodeId Time
aliveSet <- HeartbeatState -> Map NodeId Time
alive (HeartbeatState -> Map NodeId Time)
-> m HeartbeatState -> m (Map NodeId Time)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m HeartbeatState -> m HeartbeatState
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m HeartbeatState
heartbeatState
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (NodeId
peer NodeId -> Map NodeId Time -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map NodeId Time
aliveSet) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either Connectivity inbound -> m ()
deliver (Connectivity -> Either Connectivity inbound
forall a b. a -> Either a b
Left (Connectivity -> Either Connectivity inbound)
-> Connectivity -> Either Connectivity inbound
forall a b. (a -> b) -> a -> b
$ NodeId -> Connectivity
Connected NodeId
peer)
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
TVar m HeartbeatState
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m HeartbeatState
heartbeatState ((HeartbeatState -> HeartbeatState) -> STM m ())
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \HeartbeatState
s ->
HeartbeatState
s
{ alive = Map.insert peer now (alive s)
, suspected = peer `Set.delete` suspected s
}
updateStateFromOutgoingMessages ::
(MonadSTM m, MonadMonotonicTime m) =>
NodeId ->
TVar m (Maybe Time) ->
Network m (Heartbeat outbound) ->
Network m outbound
updateStateFromOutgoingMessages :: forall (m :: * -> *) outbound.
(MonadSTM m, MonadMonotonicTime m) =>
NodeId
-> TVar m (Maybe Time)
-> Network m (Heartbeat outbound)
-> Network m outbound
updateStateFromOutgoingMessages NodeId
nodeId TVar m (Maybe Time)
lastSent Network{Heartbeat outbound -> m ()
broadcast :: Heartbeat outbound -> m ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast} =
(outbound -> m ()) -> Network m outbound
forall (m :: * -> *) msg. (msg -> m ()) -> Network m msg
Network ((outbound -> m ()) -> Network m outbound)
-> (outbound -> m ()) -> Network m outbound
forall a b. (a -> b) -> a -> b
$ \outbound
msg -> do
Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
TVar m (Maybe Time) -> Time -> m ()
forall (m :: * -> *).
MonadSTM m =>
TVar m (Maybe Time) -> Time -> m ()
updateLastSent TVar m (Maybe Time)
lastSent Time
now
Heartbeat outbound -> m ()
broadcast (NodeId -> outbound -> Heartbeat outbound
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
nodeId outbound
msg)
updateLastSent :: MonadSTM m => TVar m (Maybe Time) -> Time -> m ()
updateLastSent :: forall (m :: * -> *).
MonadSTM m =>
TVar m (Maybe Time) -> Time -> m ()
updateLastSent TVar m (Maybe Time)
lastSent Time
now = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TVar m (Maybe Time) -> Maybe Time -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Maybe Time)
lastSent (Time -> Maybe Time
forall a. a -> Maybe a
Just Time
now))
checkHeartbeatState ::
( MonadDelay m
, MonadSTM m
) =>
NodeId ->
TVar m (Maybe Time) ->
Network m (Heartbeat outbound) ->
m ()
checkHeartbeatState :: forall (m :: * -> *) outbound.
(MonadDelay m, MonadSTM m) =>
NodeId
-> TVar m (Maybe Time) -> Network m (Heartbeat outbound) -> m ()
checkHeartbeatState NodeId
nodeId TVar m (Maybe Time)
lastSent Network{Heartbeat outbound -> m ()
$sel:broadcast:Network :: forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast :: Heartbeat outbound -> m ()
broadcast} =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
heartbeatDelay
Maybe Time
st <- TVar m (Maybe Time) -> m (Maybe Time)
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m (Maybe Time)
lastSent
Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Time -> Maybe Time -> Bool
shouldSendHeartbeat Time
now Maybe Time
st) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar m (Maybe Time) -> Time -> m ()
forall (m :: * -> *).
MonadSTM m =>
TVar m (Maybe Time) -> Time -> m ()
updateLastSent TVar m (Maybe Time)
lastSent Time
now
Heartbeat outbound -> m ()
broadcast (NodeId -> Heartbeat outbound
forall msg. NodeId -> Heartbeat msg
Ping NodeId
nodeId)
shouldSendHeartbeat :: Time -> Maybe Time -> Bool
shouldSendHeartbeat :: Time -> Maybe Time -> Bool
shouldSendHeartbeat Time
now = Bool -> (Time -> Bool) -> Maybe Time -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
True ((DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout DiffTime -> DiffTime
forall a. a -> a
id Time
now)
checkRemoteParties ::
( MonadDelay m
, MonadSTM m
) =>
TVar m HeartbeatState ->
(Connectivity -> m ()) ->
m ()
checkRemoteParties :: forall (m :: * -> *).
(MonadDelay m, MonadSTM m) =>
TVar m HeartbeatState -> (Connectivity -> m ()) -> m ()
checkRemoteParties TVar m HeartbeatState
heartbeatState Connectivity -> m ()
onConnectivity =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
heartbeatDelay DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* DiffTime
2)
Time
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
TVar m HeartbeatState -> Time -> m (Set NodeId)
forall (m :: * -> *).
MonadSTM m =>
TVar m HeartbeatState -> Time -> m (Set NodeId)
updateSuspected TVar m HeartbeatState
heartbeatState Time
now
m (Set NodeId) -> (Set NodeId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (NodeId -> m ()) -> Set NodeId -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Connectivity -> m ()
onConnectivity (Connectivity -> m ())
-> (NodeId -> Connectivity) -> NodeId -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Connectivity
Disconnected)
updateSuspected :: MonadSTM m => TVar m HeartbeatState -> Time -> m (Set NodeId)
updateSuspected :: forall (m :: * -> *).
MonadSTM m =>
TVar m HeartbeatState -> Time -> m (Set NodeId)
updateSuspected TVar m HeartbeatState
heartbeatState Time
now =
STM m (Set NodeId) -> m (Set NodeId)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Set NodeId) -> m (Set NodeId))
-> STM m (Set NodeId) -> m (Set NodeId)
forall a b. (a -> b) -> a -> b
$ do
Map NodeId Time
aliveParties <- HeartbeatState -> Map NodeId Time
alive (HeartbeatState -> Map NodeId Time)
-> STM m HeartbeatState -> STM m (Map NodeId Time)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m HeartbeatState -> STM m HeartbeatState
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m HeartbeatState
heartbeatState
let timedOutParties :: Map NodeId Time
timedOutParties = (Time -> Bool) -> Map NodeId Time -> Map NodeId Time
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter ((DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout (DiffTime
2 *) Time
now) Map NodeId Time
aliveParties
Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Map NodeId Time -> Bool
forall k a. Map k a -> Bool
Map.null Map NodeId Time
timedOutParties) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
TVar m HeartbeatState
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m HeartbeatState
heartbeatState ((HeartbeatState -> HeartbeatState) -> STM m ())
-> (HeartbeatState -> HeartbeatState) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \HeartbeatState
s ->
HeartbeatState
s
{ suspected = suspected s <> Map.keysSet timedOutParties
, alive = aliveParties `Map.difference` timedOutParties
}
Set NodeId -> STM m (Set NodeId)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Set NodeId -> STM m (Set NodeId))
-> Set NodeId -> STM m (Set NodeId)
forall a b. (a -> b) -> a -> b
$ Map NodeId Time -> Set NodeId
forall k a. Map k a -> Set k
Map.keysSet Map NodeId Time
timedOutParties
checkTimeout :: (DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout :: (DiffTime -> DiffTime) -> Time -> Time -> Bool
checkTimeout DiffTime -> DiffTime
delayFn Time
now Time
seen = Time -> Time -> DiffTime
diffTime Time
now Time
seen DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> DiffTime -> DiffTime
delayFn DiffTime
livenessDelay