{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE UndecidableInstances #-}
module Hydra.Node where
import Hydra.Prelude
import Conduit (MonadUnliftIO, ZipSink (..), foldMapC, foldlC, mapC, mapM_C, runConduitRes, (.|))
import Control.Concurrent.Class.MonadSTM (
import Control.Monad.Trans.Writer (execWriter, tell)
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.Server (Server, sendOutput)
import Hydra.Cardano.Api (AsType (AsPaymentKey, AsSigningKey, AsVerificationKey), getVerificationKey)
import Hydra.Chain (
Chain (..),
ChainEvent (..),
import Hydra.Chain.ChainState (ChainStateType, IsChainState)
import Hydra.Chain.Direct.Util (readFileTextEnvelopeThrow)
import Hydra.Events (EventId, EventSink (..), EventSource (..), StateEvent (..), getEventId, putEventsToSinks, stateChanged)
import Hydra.HeadLogic (
Effect (..),
HeadState (..),
IdleState (..),
Input (..),
Outcome (..),
import Hydra.HeadLogic qualified as HeadLogic
import Hydra.HeadLogic.Outcome (StateChanged (..))
import Hydra.HeadLogic.State (getHeadParameters)
import Hydra.Ledger (Ledger)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (Network (..), NetworkCallback (..))
import Hydra.Network.Message (Message, NetworkEvent (..))
import Hydra.Node.InputQueue (InputQueue (..), Queued (..), createInputQueue)
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod, defaultDepositDeadline)
import Hydra.Tx (HasParty (..), HeadParameters (..), Party (..), deriveParty)
import Hydra.Tx.Crypto (AsType (AsHydraKey))
import Hydra.Tx.Environment (Environment (..))
import Hydra.Tx.IsTx (ArbitraryIsTx)
import Hydra.Tx.Utils (verificationKeyToOnChainId)
initEnvironment :: RunOptions -> IO Environment
initEnvironment :: RunOptions -> IO Environment
initEnvironment RunOptions
options = do
SigningKey HydraKey
sk <- AsType (SigningKey HydraKey)
-> FilePath -> IO (SigningKey HydraKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType HydraKey -> AsType (SigningKey HydraKey)
forall a. AsType a -> AsType (SigningKey a)
AsSigningKey AsType HydraKey
AsHydraKey) FilePath
otherParties <- (FilePath -> IO Party) -> [FilePath] -> IO [Party]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM FilePath -> IO Party
loadParty [FilePath]
participants <- IO [OnChainId]
Environment -> IO Environment
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Environment -> IO Environment) -> Environment -> IO Environment
forall a b. (a -> b) -> a -> b
{ $sel:party:Environment :: Party
party = SigningKey HydraKey -> Party
deriveParty SigningKey HydraKey
, $sel:signingKey:Environment :: SigningKey HydraKey
signingKey = SigningKey HydraKey
, [Party]
otherParties :: [Party]
$sel:otherParties:Environment :: [Party]
, [OnChainId]
participants :: [OnChainId]
$sel:participants:Environment :: [OnChainId]
, ContestationPeriod
contestationPeriod :: ContestationPeriod
$sel:contestationPeriod:Environment :: ContestationPeriod
, DepositDeadline
depositDeadline :: DepositDeadline
$sel:depositDeadline:Environment :: DepositDeadline
getParticipants :: IO [OnChainId]
getParticipants =
case ChainConfig
chainConfig of
Offline{} -> [OnChainId] -> IO [OnChainId]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
{ [FilePath]
cardanoVerificationKeys :: [FilePath]
$sel:cardanoVerificationKeys:DirectChainConfig :: DirectChainConfig -> [FilePath]
, FilePath
cardanoSigningKey :: FilePath
$sel:cardanoSigningKey:DirectChainConfig :: DirectChainConfig -> FilePath
} -> do
SigningKey PaymentKey
ownSigningKey <- AsType (SigningKey PaymentKey)
-> FilePath -> IO (SigningKey PaymentKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType PaymentKey -> AsType (SigningKey PaymentKey)
forall a. AsType a -> AsType (SigningKey a)
AsSigningKey AsType PaymentKey
AsPaymentKey) FilePath
[VerificationKey PaymentKey]
otherVerificationKeys <- (FilePath -> IO (VerificationKey PaymentKey))
-> [FilePath] -> IO [VerificationKey PaymentKey]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (AsType (VerificationKey PaymentKey)
-> FilePath -> IO (VerificationKey PaymentKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType PaymentKey -> AsType (VerificationKey PaymentKey)
forall a. AsType a -> AsType (VerificationKey a)
AsVerificationKey AsType PaymentKey
AsPaymentKey)) [FilePath]
[OnChainId] -> IO [OnChainId]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([OnChainId] -> IO [OnChainId]) -> [OnChainId] -> IO [OnChainId]
forall a b. (a -> b) -> a -> b
$ VerificationKey PaymentKey -> OnChainId
verificationKeyToOnChainId (VerificationKey PaymentKey -> OnChainId)
-> [VerificationKey PaymentKey] -> [OnChainId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (SigningKey PaymentKey -> VerificationKey PaymentKey
forall keyrole.
(Key keyrole, HasTypeProxy keyrole) =>
SigningKey keyrole -> VerificationKey keyrole
getVerificationKey SigningKey PaymentKey
ownSigningKey VerificationKey PaymentKey
-> [VerificationKey PaymentKey] -> [VerificationKey PaymentKey]
forall a. a -> [a] -> [a]
: [VerificationKey PaymentKey]
contestationPeriod :: ContestationPeriod
contestationPeriod = case ChainConfig
chainConfig of
Offline{} -> ContestationPeriod
Direct DirectChainConfig{$sel:contestationPeriod:DirectChainConfig :: DirectChainConfig -> ContestationPeriod
contestationPeriod = ContestationPeriod
cp} -> ContestationPeriod
depositDeadline :: DepositDeadline
depositDeadline = case ChainConfig
chainConfig of
Offline{} -> DepositDeadline
Direct DirectChainConfig{$sel:depositDeadline:DirectChainConfig :: DirectChainConfig -> DepositDeadline
depositDeadline = DepositDeadline
ddeadline} -> DepositDeadline
loadParty :: FilePath -> IO Party
loadParty FilePath
p =
VerificationKey HydraKey -> Party
Party (VerificationKey HydraKey -> Party)
-> IO (VerificationKey HydraKey) -> IO Party
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsType (VerificationKey HydraKey)
-> FilePath -> IO (VerificationKey HydraKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType HydraKey -> AsType (VerificationKey HydraKey)
forall a. AsType a -> AsType (VerificationKey a)
AsVerificationKey AsType HydraKey
AsHydraKey) FilePath
{ FilePath
hydraSigningKey :: FilePath
$sel:hydraSigningKey:RunOptions :: RunOptions -> FilePath
, [FilePath]
hydraVerificationKeys :: [FilePath]
$sel:hydraVerificationKeys:RunOptions :: RunOptions -> [FilePath]
, ChainConfig
chainConfig :: ChainConfig
$sel:chainConfig:RunOptions :: RunOptions -> ChainConfig
} = RunOptions
checkHeadState ::
MonadThrow m =>
Tracer m (HydraNodeLog tx) ->
Environment ->
HeadState tx ->
m ()
checkHeadState :: forall (m :: * -> *) tx.
MonadThrow m =>
Tracer m (HydraNodeLog tx) -> Environment -> HeadState tx -> m ()
checkHeadState Tracer m (HydraNodeLog tx)
tracer Environment
env HeadState tx
headState = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ParamMismatch] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ParamMismatch]
paramsMismatch) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer ([ParamMismatch] -> HydraNodeLog tx
forall tx. [ParamMismatch] -> HydraNodeLog tx
Misconfiguration [ParamMismatch]
ParameterMismatch -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (ParameterMismatch -> m ()) -> ParameterMismatch -> m ()
forall a b. (a -> b) -> a -> b
$ [ParamMismatch] -> ParameterMismatch
ParameterMismatch [ParamMismatch]
paramsMismatch :: [ParamMismatch]
paramsMismatch =
-> (HeadParameters -> [ParamMismatch])
-> Maybe HeadParameters
-> [ParamMismatch]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] HeadParameters -> [ParamMismatch]
validateParameters (Maybe HeadParameters -> [ParamMismatch])
-> Maybe HeadParameters -> [ParamMismatch]
forall a b. (a -> b) -> a -> b
$ HeadState tx -> Maybe HeadParameters
forall tx. HeadState tx -> Maybe HeadParameters
getHeadParameters HeadState tx
validateParameters :: HeadParameters -> [ParamMismatch]
validateParameters HeadParameters{$sel:contestationPeriod:HeadParameters :: HeadParameters -> ContestationPeriod
contestationPeriod = ContestationPeriod
loadedCp, [Party]
parties :: [Party]
$sel:parties:HeadParameters :: HeadParameters -> [Party]
parties} =
Writer [ParamMismatch] () -> [ParamMismatch]
forall w a. Writer w a -> w
execWriter (Writer [ParamMismatch] () -> [ParamMismatch])
-> Writer [ParamMismatch] () -> [ParamMismatch]
forall a b. (a -> b) -> a -> b
$ do
Bool -> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ContestationPeriod
loadedCp ContestationPeriod -> ContestationPeriod -> Bool
forall a. Eq a => a -> a -> Bool
/= ContestationPeriod
configuredCp) (Writer [ParamMismatch] () -> Writer [ParamMismatch] ())
-> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall a b. (a -> b) -> a -> b
[ParamMismatch] -> Writer [ParamMismatch] ()
forall (m :: * -> *) w. Monad m => w -> WriterT w m ()
tell [ContestationPeriodMismatch{ContestationPeriod
loadedCp :: ContestationPeriod
$sel:loadedCp:ContestationPeriodMismatch :: ContestationPeriod
loadedCp, ContestationPeriod
configuredCp :: ContestationPeriod
$sel:configuredCp:ContestationPeriodMismatch :: ContestationPeriod
let loadedParties :: [Party]
loadedParties = [Party] -> [Party]
forall a. Ord a => [a] -> [a]
sort [Party]
configuredParties :: [Party]
configuredParties = [Party] -> [Party]
forall a. Ord a => [a] -> [a]
sort (Party
party Party -> [Party] -> [Party]
forall a. a -> [a] -> [a]
: [Party]
Bool -> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([Party]
loadedParties [Party] -> [Party] -> Bool
forall a. Eq a => a -> a -> Bool
/= [Party]
configuredParties) (Writer [ParamMismatch] () -> Writer [ParamMismatch] ())
-> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall a b. (a -> b) -> a -> b
[ParamMismatch] -> Writer [ParamMismatch] ()
forall (m :: * -> *) w. Monad m => w -> WriterT w m ()
tell [PartiesMismatch{[Party]
loadedParties :: [Party]
$sel:loadedParties:ContestationPeriodMismatch :: [Party]
loadedParties, [Party]
configuredParties :: [Party]
$sel:configuredParties:ContestationPeriodMismatch :: [Party]
Environment{$sel:contestationPeriod:Environment :: Environment -> ContestationPeriod
contestationPeriod = ContestationPeriod
configuredCp, [Party]
$sel:otherParties:Environment :: Environment -> [Party]
otherParties :: [Party]
otherParties, Party
$sel:party:Environment :: Environment -> Party
party :: Party
party} = Environment
data DraftHydraNode tx m = DraftHydraNode
{ forall tx (m :: * -> *).
DraftHydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
, forall tx (m :: * -> *). DraftHydraNode tx m -> Environment
env :: Environment
, forall tx (m :: * -> *). DraftHydraNode tx m -> Ledger tx
ledger :: Ledger tx
, forall tx (m :: * -> *). DraftHydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
, forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
, forall tx (m :: * -> *).
DraftHydraNode tx m -> EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
, forall tx (m :: * -> *).
DraftHydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
forall tx (m :: * -> *).
DraftHydraNode tx m -> ChainStateHistory tx
chainStateHistory :: ChainStateHistory tx
instance HasParty (DraftHydraNode tx m) where
getParty :: DraftHydraNode tx m -> Party
getParty DraftHydraNode{Environment
$sel:env:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> Environment
env :: Environment
env} = Environment -> Party
forall a. HasParty a => a -> Party
getParty Environment
hydrate ::
(IsChainState tx, MonadDelay m, MonadLabelledSTM m, MonadAsync m, MonadThrow m, MonadUnliftIO m) =>
Tracer m (HydraNodeLog tx) ->
Environment ->
Ledger tx ->
ChainStateType tx ->
EventSource (StateEvent tx) m ->
[EventSink (StateEvent tx) m] ->
m (DraftHydraNode tx m)
hydrate :: forall tx (m :: * -> *).
(IsChainState tx, MonadDelay m, MonadLabelledSTM m, MonadAsync m,
MonadThrow m, MonadUnliftIO m) =>
Tracer m (HydraNodeLog tx)
-> Environment
-> Ledger tx
-> ChainStateType tx
-> EventSource (StateEvent tx) m
-> [EventSink (StateEvent tx) m]
-> m (DraftHydraNode tx m)
hydrate Tracer m (HydraNodeLog tx)
tracer Environment
env Ledger tx
ledger ChainStateType tx
initialChainState EventSource (StateEvent tx) m
eventSource [EventSink (StateEvent tx) m]
eventSinks = do
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer HydraNodeLog tx
forall tx. HydraNodeLog tx
(Last EventId
lastEventId, (HeadState tx
headState, ChainStateHistory tx
chainStateHistory)) <-
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
-> m (Last EventId, (HeadState tx, ChainStateHistory tx))
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes (ConduitT
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
-> m (Last EventId, (HeadState tx, ChainStateHistory tx)))
-> ConduitT
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
-> m (Last EventId, (HeadState tx, ChainStateHistory tx))
forall a b. (a -> b) -> a -> b
EventSource (StateEvent tx) m
-> HasEventId (StateEvent tx) =>
ConduitT () (StateEvent tx) (ResourceT m) ()
forall e (m :: * -> *).
EventSource e m -> HasEventId e => ConduitT () e (ResourceT m) ()
sourceEvents EventSource (StateEvent tx) m
ConduitT () (StateEvent tx) (ResourceT m) ()
-> ConduitT
(StateEvent tx)
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
-> ConduitT
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ZipSink
(StateEvent tx)
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
-> ConduitT
(StateEvent tx)
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
forall i (m :: * -> *) r. ZipSink i m r -> ConduitT i Void m r
( (,)
(Last EventId
-> (HeadState tx, ChainStateHistory tx)
-> (Last EventId, (HeadState tx, ChainStateHistory tx)))
-> ZipSink (StateEvent tx) (ResourceT m) (Last EventId)
-> ZipSink
(StateEvent tx)
(ResourceT m)
((HeadState tx, ChainStateHistory tx)
-> (Last EventId, (HeadState tx, ChainStateHistory tx)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT (StateEvent tx) Void (ResourceT m) (Last EventId)
-> ZipSink (StateEvent tx) (ResourceT m) (Last EventId)
forall i (m :: * -> *) r. ConduitT i Void m r -> ZipSink i m r
ZipSink ((StateEvent tx -> Last EventId)
-> ConduitT (StateEvent tx) Void (ResourceT m) (Last EventId)
forall (m :: * -> *) b a o.
(Monad m, Monoid b) =>
(a -> b) -> ConduitT a o m b
foldMapC (Maybe EventId -> Last EventId
forall a. Maybe a -> Last a
Last (Maybe EventId -> Last EventId)
-> (StateEvent tx -> Maybe EventId)
-> StateEvent tx
-> Last EventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EventId -> Maybe EventId
forall a. a -> Maybe a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EventId -> Maybe EventId)
-> (StateEvent tx -> EventId) -> StateEvent tx -> Maybe EventId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StateEvent tx -> EventId
forall a. HasEventId a => a -> EventId
(StateEvent tx)
(ResourceT m)
((HeadState tx, ChainStateHistory tx)
-> (Last EventId, (HeadState tx, ChainStateHistory tx)))
-> ZipSink
(StateEvent tx) (ResourceT m) (HeadState tx, ChainStateHistory tx)
-> ZipSink
(StateEvent tx)
(ResourceT m)
(Last EventId, (HeadState tx, ChainStateHistory tx))
forall a b.
ZipSink (StateEvent tx) (ResourceT m) (a -> b)
-> ZipSink (StateEvent tx) (ResourceT m) a
-> ZipSink (StateEvent tx) (ResourceT m) b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ConduitT
(StateEvent tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
-> ZipSink
(StateEvent tx) (ResourceT m) (HeadState tx, ChainStateHistory tx)
forall i (m :: * -> *) r. ConduitT i Void m r -> ZipSink i m r
ZipSink ConduitT
(StateEvent tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
Tracer m (HydraNodeLog tx) -> Environment -> HeadState tx -> m ()
forall (m :: * -> *) tx.
MonadThrow m =>
Tracer m (HydraNodeLog tx) -> Environment -> HeadState tx -> m ()
checkHeadState Tracer m (HydraNodeLog tx)
tracer Environment
env HeadState tx
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer HydraNodeLog tx
forall tx. HydraNodeLog tx
ConduitT () Void (ResourceT m) () -> m ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes (ConduitT () Void (ResourceT m) () -> m ())
-> ConduitT () Void (ResourceT m) () -> m ()
forall a b. (a -> b) -> a -> b
EventSource (StateEvent tx) m
-> HasEventId (StateEvent tx) =>
ConduitT () (StateEvent tx) (ResourceT m) ()
forall e (m :: * -> *).
EventSource e m -> HasEventId e => ConduitT () e (ResourceT m) ()
sourceEvents EventSource (StateEvent tx) m
eventSource ConduitT () (StateEvent tx) (ResourceT m) ()
-> ConduitT (StateEvent tx) Void (ResourceT m) ()
-> ConduitT () Void (ResourceT m) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (StateEvent tx -> ResourceT m ())
-> ConduitT (StateEvent tx) Void (ResourceT m) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
mapM_C (\StateEvent tx
e -> m () -> ResourceT m ()
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ResourceT m ()) -> m () -> ResourceT m ()
forall a b. (a -> b) -> a -> b
$ [EventSink (StateEvent tx) m] -> [StateEvent tx] -> m ()
forall (m :: * -> *) e.
(Monad m, HasEventId e) =>
[EventSink e m] -> [e] -> m ()
putEventsToSinks [EventSink (StateEvent tx) m]
eventSinks [StateEvent tx
NodeState tx m
nodeState <- Maybe EventId -> HeadState tx -> m (NodeState tx m)
forall (m :: * -> *) tx.
MonadLabelledSTM m =>
Maybe EventId -> HeadState tx -> m (NodeState tx m)
createNodeState (Last EventId -> Maybe EventId
forall a. Last a -> Maybe a
getLast Last EventId
lastEventId) HeadState tx
InputQueue m (Input tx)
inputQueue <- m (InputQueue m (Input tx))
forall (m :: * -> *) e.
(MonadDelay m, MonadAsync m, MonadLabelledSTM m) =>
m (InputQueue m e)
DraftHydraNode tx m -> m (DraftHydraNode tx m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
{ Tracer m (HydraNodeLog tx)
$sel:tracer:DraftHydraNode :: Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
, Environment
$sel:env:DraftHydraNode :: Environment
env :: Environment
, Ledger tx
$sel:ledger:DraftHydraNode :: Ledger tx
ledger :: Ledger tx
, NodeState tx m
$sel:nodeState:DraftHydraNode :: NodeState tx m
nodeState :: NodeState tx m
, InputQueue m (Input tx)
$sel:inputQueue:DraftHydraNode :: InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
, EventSource (StateEvent tx) m
$sel:eventSource:DraftHydraNode :: EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
, [EventSink (StateEvent tx) m]
$sel:eventSinks:DraftHydraNode :: [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
, ChainStateHistory tx
$sel:chainStateHistory:DraftHydraNode :: ChainStateHistory tx
chainStateHistory :: ChainStateHistory tx
initialState :: HeadState tx
initialState = IdleState tx -> HeadState tx
forall tx. IdleState tx -> HeadState tx
Idle IdleState{$sel:chainState:IdleState :: ChainStateType tx
chainState = ChainStateType tx
recoverHeadStateC :: ConduitT
(StateEvent tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
recoverHeadStateC =
(StateEvent tx -> StateChanged tx)
-> ConduitT (StateEvent tx) (StateChanged tx) (ResourceT m) ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC StateEvent tx -> StateChanged tx
forall tx. StateEvent tx -> StateChanged tx
ConduitT (StateEvent tx) (StateChanged tx) (ResourceT m) ()
-> ConduitT
(StateChanged tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
-> ConduitT
(StateEvent tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ZipSink
(StateChanged tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
-> ConduitT
(StateChanged tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
forall i (m :: * -> *) r. ZipSink i m r -> ConduitT i Void m r
( (,)
(HeadState tx
-> ChainStateHistory tx -> (HeadState tx, ChainStateHistory tx))
-> ZipSink (StateChanged tx) (ResourceT m) (HeadState tx)
-> ZipSink
(StateChanged tx)
(ResourceT m)
(ChainStateHistory tx -> (HeadState tx, ChainStateHistory tx))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT (StateChanged tx) Void (ResourceT m) (HeadState tx)
-> ZipSink (StateChanged tx) (ResourceT m) (HeadState tx)
forall i (m :: * -> *) r. ConduitT i Void m r -> ZipSink i m r
ZipSink ((HeadState tx -> StateChanged tx -> HeadState tx)
-> HeadState tx
-> ConduitT (StateChanged tx) Void (ResourceT m) (HeadState tx)
forall (m :: * -> *) a b o.
Monad m =>
(a -> b -> a) -> a -> ConduitT b o m a
foldlC HeadState tx -> StateChanged tx -> HeadState tx
forall tx.
IsChainState tx =>
HeadState tx -> StateChanged tx -> HeadState tx
aggregate HeadState tx
(StateChanged tx)
(ResourceT m)
(ChainStateHistory tx -> (HeadState tx, ChainStateHistory tx))
-> ZipSink (StateChanged tx) (ResourceT m) (ChainStateHistory tx)
-> ZipSink
(StateChanged tx)
(ResourceT m)
(HeadState tx, ChainStateHistory tx)
forall a b.
ZipSink (StateChanged tx) (ResourceT m) (a -> b)
-> ZipSink (StateChanged tx) (ResourceT m) a
-> ZipSink (StateChanged tx) (ResourceT m) b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ConduitT
(StateChanged tx) Void (ResourceT m) (ChainStateHistory tx)
-> ZipSink (StateChanged tx) (ResourceT m) (ChainStateHistory tx)
forall i (m :: * -> *) r. ConduitT i Void m r -> ZipSink i m r
ZipSink ((ChainStateHistory tx -> StateChanged tx -> ChainStateHistory tx)
-> ChainStateHistory tx
-> ConduitT
(StateChanged tx) Void (ResourceT m) (ChainStateHistory tx)
forall (m :: * -> *) a b o.
Monad m =>
(a -> b -> a) -> a -> ConduitT b o m a
foldlC ChainStateHistory tx -> StateChanged tx -> ChainStateHistory tx
forall tx.
IsChainState tx =>
ChainStateHistory tx -> StateChanged tx -> ChainStateHistory tx
aggregateChainStateHistory (ChainStateHistory tx
-> ConduitT
(StateChanged tx) Void (ResourceT m) (ChainStateHistory tx))
-> ChainStateHistory tx
-> ConduitT
(StateChanged tx) Void (ResourceT m) (ChainStateHistory tx)
forall a b. (a -> b) -> a -> b
$ ChainStateType tx -> ChainStateHistory tx
forall tx. ChainStateType tx -> ChainStateHistory tx
initHistory ChainStateType tx
wireChainInput :: DraftHydraNode tx m -> (ChainEvent tx -> m ())
wireChainInput :: forall tx (m :: * -> *).
DraftHydraNode tx m -> ChainEvent tx -> m ()
wireChainInput DraftHydraNode tx m
node = Input tx -> m ()
enqueue (Input tx -> m ())
-> (ChainEvent tx -> Input tx) -> ChainEvent tx -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainEvent tx -> Input tx
forall tx. ChainEvent tx -> Input tx
DraftHydraNode{$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
enqueue :: Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue}} = DraftHydraNode tx m
wireClientInput :: DraftHydraNode tx m -> (ClientInput tx -> m ())
wireClientInput :: forall tx (m :: * -> *).
DraftHydraNode tx m -> ClientInput tx -> m ()
wireClientInput DraftHydraNode tx m
node = Input tx -> m ()
enqueue (Input tx -> m ())
-> (ClientInput tx -> Input tx) -> ClientInput tx -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClientInput tx -> Input tx
forall tx. ClientInput tx -> Input tx
DraftHydraNode{$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: Input tx -> m ()
enqueue}} = DraftHydraNode tx m
wireNetworkInput :: DraftHydraNode tx m -> NetworkCallback (NetworkEvent (Message tx)) m
wireNetworkInput :: forall tx (m :: * -> *).
DraftHydraNode tx m
-> NetworkCallback (NetworkEvent (Message tx)) m
wireNetworkInput DraftHydraNode tx m
node =
NetworkCallback{$sel:deliver:NetworkCallback :: NetworkEvent (Message tx) -> m ()
deliver = Input tx -> m ()
enqueue (Input tx -> m ())
-> (NetworkEvent (Message tx) -> Input tx)
-> NetworkEvent (Message tx)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TTL -> NetworkEvent (Message tx) -> Input tx
forall tx. TTL -> NetworkEvent (Message tx) -> Input tx
NetworkInput TTL
DraftHydraNode{$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: Input tx -> m ()
enqueue}} = DraftHydraNode tx m
connect ::
Monad m =>
Chain tx m ->
Network m (Message tx) ->
Server tx m ->
DraftHydraNode tx m ->
m (HydraNode tx m)
connect :: forall (m :: * -> *) tx.
Monad m =>
Chain tx m
-> Network m (Message tx)
-> Server tx m
-> DraftHydraNode tx m
-> m (HydraNode tx m)
connect Chain tx m
chain Network m (Message tx)
network Server tx m
server DraftHydraNode tx m
node =
HydraNode tx m -> m (HydraNode tx m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HydraNode{Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
$sel:tracer:HydraNode :: Tracer m (HydraNodeLog tx)
tracer, Environment
env :: Environment
$sel:env:HydraNode :: Environment
env, Ledger tx
ledger :: Ledger tx
$sel:ledger:HydraNode :: Ledger tx
ledger, NodeState tx m
nodeState :: NodeState tx m
$sel:nodeState:HydraNode :: NodeState tx m
nodeState, InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
$sel:inputQueue:HydraNode :: InputQueue m (Input tx)
inputQueue, EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
$sel:eventSource:HydraNode :: EventSource (StateEvent tx) m
eventSource, [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
$sel:eventSinks:HydraNode :: [EventSink (StateEvent tx) m]
eventSinks, $sel:oc:HydraNode :: Chain tx m
oc = Chain tx m
chain, $sel:hn:HydraNode :: Network m (Message tx)
hn = Network m (Message tx)
network, Server tx m
server :: Server tx m
$sel:server:HydraNode :: Server tx m
DraftHydraNode{Tracer m (HydraNodeLog tx)
$sel:tracer:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
tracer, Environment
$sel:env:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> Environment
env :: Environment
env, Ledger tx
$sel:ledger:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> Ledger tx
ledger :: Ledger tx
ledger, NodeState tx m
$sel:nodeState:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
nodeState, InputQueue m (Input tx)
$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
inputQueue, EventSource (StateEvent tx) m
$sel:eventSource:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
eventSource, [EventSink (StateEvent tx) m]
$sel:eventSinks:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
eventSinks} = DraftHydraNode tx m
data HydraNode tx m = HydraNode
{ forall tx (m :: * -> *).
HydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
, forall tx (m :: * -> *). HydraNode tx m -> Environment
env :: Environment
, forall tx (m :: * -> *). HydraNode tx m -> Ledger tx
ledger :: Ledger tx
, forall tx (m :: * -> *). HydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
, forall tx (m :: * -> *). HydraNode tx m -> InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
, forall tx (m :: * -> *).
HydraNode tx m -> EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
, forall tx (m :: * -> *).
HydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
, forall tx (m :: * -> *). HydraNode tx m -> Chain tx m
oc :: Chain tx m
, forall tx (m :: * -> *). HydraNode tx m -> Network m (Message tx)
hn :: Network m (Message tx)
, forall tx (m :: * -> *). HydraNode tx m -> Server tx m
server :: Server tx m
runHydraNode ::
( MonadCatch m
, MonadAsync m
, IsChainState tx
) =>
HydraNode tx m ->
m ()
runHydraNode :: forall (m :: * -> *) tx.
(MonadCatch m, MonadAsync m, IsChainState tx) =>
HydraNode tx m -> m ()
runHydraNode HydraNode tx m
node =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ HydraNode tx m -> m ()
forall (m :: * -> *) tx.
(MonadCatch m, MonadAsync m, IsChainState tx) =>
HydraNode tx m -> m ()
stepHydraNode HydraNode tx m
stepHydraNode ::
( MonadCatch m
, MonadAsync m
, IsChainState tx
) =>
HydraNode tx m ->
m ()
stepHydraNode :: forall (m :: * -> *) tx.
(MonadCatch m, MonadAsync m, IsChainState tx) =>
HydraNode tx m -> m ()
stepHydraNode HydraNode tx m
node = do
i :: Queued (Input tx)
queuedId :: EventId
$sel:queuedId:Queued :: forall a. Queued a -> EventId
queuedId, Input tx
queuedItem :: Input tx
$sel:queuedItem:Queued :: forall a. Queued a -> a
queuedItem} <- m (Queued (Input tx))
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ BeginInput{$sel:by:BeginInput :: Party
by = Party
party, $sel:inputId:BeginInput :: EventId
inputId = EventId
queuedId, $sel:input:BeginInput :: Input tx
input = Input tx
Outcome tx
outcome <- STM m (Outcome tx) -> m (Outcome tx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Outcome tx) -> m (Outcome tx))
-> STM m (Outcome tx) -> m (Outcome tx)
forall a b. (a -> b) -> a -> b
$ HydraNode tx m -> Input tx -> STM m (Outcome tx)
forall tx (m :: * -> *).
IsChainState tx =>
HydraNode tx m -> Input tx -> STM m (Outcome tx)
processNextInput HydraNode tx m
node Input tx
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (Party -> Outcome tx -> HydraNodeLog tx
forall tx. Party -> Outcome tx -> HydraNodeLog tx
LogicOutcome Party
party Outcome tx
case Outcome tx
outcome of
Continue{[StateChanged tx]
stateChanges :: [StateChanged tx]
$sel:stateChanges:Continue :: forall tx. Outcome tx -> [StateChanged tx]
stateChanges, [Effect tx]
effects :: [Effect tx]
$sel:effects:Continue :: forall tx. Outcome tx -> [Effect tx]
effects} -> do
HydraNode tx m -> [StateChanged tx] -> m ()
forall (m :: * -> *) tx.
MonadSTM m =>
HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges HydraNode tx m
node [StateChanged tx]
HydraNode tx m
-> Tracer m (HydraNodeLog tx) -> EventId -> [Effect tx] -> m ()
forall (m :: * -> *) tx.
(MonadAsync m, MonadCatch m, IsChainState tx) =>
HydraNode tx m
-> Tracer m (HydraNodeLog tx) -> EventId -> [Effect tx] -> m ()
processEffects HydraNode tx m
node Tracer m (HydraNodeLog tx)
tracer EventId
queuedId [Effect tx]
Wait{[StateChanged tx]
$sel:stateChanges:Continue :: forall tx. Outcome tx -> [StateChanged tx]
stateChanges :: [StateChanged tx]
stateChanges} -> do
HydraNode tx m -> [StateChanged tx] -> m ()
forall (m :: * -> *) tx.
MonadSTM m =>
HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges HydraNode tx m
node [StateChanged tx]
Queued (Input tx) -> m ()
maybeReenqueue Queued (Input tx)
Error{} -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer EndInput{$sel:by:BeginInput :: Party
by = Party
party, $sel:inputId:BeginInput :: EventId
inputId = EventId
maybeReenqueue :: Queued (Input tx) -> m ()
maybeReenqueue q :: Queued (Input tx)
$sel:queuedId:Queued :: forall a. Queued a -> EventId
queuedId :: EventId
queuedId, Input tx
$sel:queuedItem:Queued :: forall a. Queued a -> a
queuedItem :: Input tx
queuedItem} =
case Input tx
queuedItem of
NetworkInput TTL
ttl NetworkEvent (Message tx)
ttl TTL -> TTL -> Bool
forall a. Ord a => a -> a -> Bool
0 -> DiffTime -> Queued (Input tx) -> m ()
reenqueue DiffTime
waitDelay Queued (Input tx)
q{queuedItem = NetworkInput (ttl - 1) msg}
Input tx
_ -> Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ DroppedFromQueue{$sel:inputId:BeginInput :: EventId
inputId = EventId
queuedId, $sel:input:BeginInput :: Input tx
input = Input tx
$sel:party:Environment :: Environment -> Party
party :: Party
party} = Environment
HydraNode{Tracer m (HydraNodeLog tx)
$sel:tracer:HydraNode :: forall tx (m :: * -> *).
HydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
tracer, $sel:inputQueue:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{m (Queued (Input tx))
dequeue :: m (Queued (Input tx))
$sel:dequeue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> m (Queued e)
dequeue, DiffTime -> Queued (Input tx) -> m ()
reenqueue :: DiffTime -> Queued (Input tx) -> m ()
$sel:reenqueue:InputQueue :: forall (m :: * -> *) e.
InputQueue m e -> DiffTime -> Queued e -> m ()
reenqueue}, Environment
$sel:env:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Environment
env :: Environment
env} = HydraNode tx m
waitDelay :: DiffTime
waitDelay :: DiffTime
waitDelay = DiffTime
processNextInput ::
IsChainState tx =>
HydraNode tx m ->
Input tx ->
STM m (Outcome tx)
processNextInput :: forall tx (m :: * -> *).
IsChainState tx =>
HydraNode tx m -> Input tx -> STM m (Outcome tx)
processNextInput HydraNode{NodeState tx m
$sel:nodeState:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
nodeState, Ledger tx
$sel:ledger:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Ledger tx
ledger :: Ledger tx
ledger, Environment
$sel:env:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Environment
env :: Environment
env} Input tx
e =
(HeadState tx -> (Outcome tx, HeadState tx)) -> STM m (Outcome tx)
forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState ((HeadState tx -> (Outcome tx, HeadState tx))
-> STM m (Outcome tx))
-> (HeadState tx -> (Outcome tx, HeadState tx))
-> STM m (Outcome tx)
forall a b. (a -> b) -> a -> b
$ \HeadState tx
s ->
let outcome :: Outcome tx
outcome = HeadState tx -> Input tx -> Outcome tx
computeOutcome HeadState tx
s Input tx
in (Outcome tx
outcome, HeadState tx -> Outcome tx -> HeadState tx
forall tx.
IsChainState tx =>
HeadState tx -> Outcome tx -> HeadState tx
aggregateState HeadState tx
s Outcome tx
NodeState{forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
$sel:modifyHeadState:NodeState :: forall tx (m :: * -> *).
NodeState tx m
-> forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState} = NodeState tx m
computeOutcome :: HeadState tx -> Input tx -> Outcome tx
computeOutcome = Environment -> Ledger tx -> HeadState tx -> Input tx -> Outcome tx
forall tx.
IsChainState tx =>
Environment -> Ledger tx -> HeadState tx -> Input tx -> Outcome tx
HeadLogic.update Environment
env Ledger tx
processStateChanges :: MonadSTM m => HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges :: forall (m :: * -> *) tx.
MonadSTM m =>
HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges HydraNode tx m
node [StateChanged tx]
stateChanges = do
[StateEvent tx]
events <- STM m [StateEvent tx] -> m [StateEvent tx]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [StateEvent tx] -> m [StateEvent tx])
-> ((StateChanged tx -> STM m (StateEvent tx))
-> STM m [StateEvent tx])
-> (StateChanged tx -> STM m (StateEvent tx))
-> m [StateEvent tx]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StateChanged tx]
-> (StateChanged tx -> STM m (StateEvent tx))
-> STM m [StateEvent tx]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [StateChanged tx]
stateChanges ((StateChanged tx -> STM m (StateEvent tx)) -> m [StateEvent tx])
-> (StateChanged tx -> STM m (StateEvent tx)) -> m [StateEvent tx]
forall a b. (a -> b) -> a -> b
$ \StateChanged tx
stateChanged -> do
eventId <- STM m EventId
StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StateEvent{EventId
eventId :: EventId
$sel:eventId:StateEvent :: EventId
eventId, StateChanged tx
$sel:stateChanged:StateEvent :: StateChanged tx
stateChanged :: StateChanged tx
[EventSink (StateEvent tx) m] -> [StateEvent tx] -> m ()
forall (m :: * -> *) e.
(Monad m, HasEventId e) =>
[EventSink e m] -> [e] -> m ()
putEventsToSinks [EventSink (StateEvent tx) m]
eventSinks [StateEvent tx]
{ [EventSink (StateEvent tx) m]
$sel:eventSinks:HydraNode :: forall tx (m :: * -> *).
HydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
, $sel:nodeState:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> NodeState tx m
nodeState = NodeState{STM m EventId
getNextEventId :: STM m EventId
$sel:getNextEventId:NodeState :: forall tx (m :: * -> *). NodeState tx m -> STM m EventId
} = HydraNode tx m
processEffects ::
( MonadAsync m
, MonadCatch m
, IsChainState tx
) =>
HydraNode tx m ->
Tracer m (HydraNodeLog tx) ->
Word64 ->
[Effect tx] ->
m ()
processEffects :: forall (m :: * -> *) tx.
(MonadAsync m, MonadCatch m, IsChainState tx) =>
HydraNode tx m
-> Tracer m (HydraNodeLog tx) -> EventId -> [Effect tx] -> m ()
processEffects HydraNode tx m
node Tracer m (HydraNodeLog tx)
tracer EventId
inputId [Effect tx]
effects = do
((Effect tx, Word32) -> m ()) -> [(Effect tx, Word32)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Effect tx, Word32) -> m ()
processEffect ([(Effect tx, Word32)] -> m ()) -> [(Effect tx, Word32)] -> m ()
forall a b. (a -> b) -> a -> b
$ [Effect tx] -> [Word32] -> [(Effect tx, Word32)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Effect tx]
effects [Word32
0 ..]
processEffect :: (Effect tx, Word32) -> m ()
processEffect (Effect tx
effect, Word32
effectId) = do
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ Party -> EventId -> Word32 -> Effect tx -> HydraNodeLog tx
forall tx.
Party -> EventId -> Word32 -> Effect tx -> HydraNodeLog tx
BeginEffect Party
party EventId
inputId Word32
effectId Effect tx
case Effect tx
effect of
ClientEffect ServerOutput tx
i -> Server tx m -> ServerOutput tx -> m ()
forall tx (m :: * -> *). Server tx m -> ServerOutput tx -> m ()
sendOutput Server tx m
server ServerOutput tx
NetworkEffect Message tx
msg -> Network m (Message tx) -> Message tx -> m ()
forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast Network m (Message tx)
hn Message tx
OnChainEffect{PostChainTx tx
postChainTx :: PostChainTx tx
$sel:postChainTx:ClientEffect :: forall tx. Effect tx -> PostChainTx tx
postChainTx} ->
MonadThrow m => PostChainTx tx -> m ()
PostChainTx tx -> m ()
postTx PostChainTx tx
m () -> (PostTxError tx -> m ()) -> m ()
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(PostTxError tx
postTxError :: PostTxError tx) ->
Input tx -> m ()
enqueue (Input tx -> m ())
-> (ChainEvent tx -> Input tx) -> ChainEvent tx -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainEvent tx -> Input tx
forall tx. ChainEvent tx -> Input tx
ChainInput (ChainEvent tx -> m ()) -> ChainEvent tx -> m ()
forall a b. (a -> b) -> a -> b
$ PostTxError{PostChainTx tx
postChainTx :: PostChainTx tx
$sel:postChainTx:Observation :: PostChainTx tx
postChainTx, PostTxError tx
postTxError :: PostTxError tx
$sel:postTxError:Observation :: PostTxError tx
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ Party -> EventId -> Word32 -> HydraNodeLog tx
forall tx. Party -> EventId -> Word32 -> HydraNodeLog tx
EndEffect Party
party EventId
inputId Word32
{ Network m (Message tx)
$sel:hn:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Network m (Message tx)
hn :: Network m (Message tx)
, $sel:oc:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Chain tx m
oc = Chain{MonadThrow m => PostChainTx tx -> m ()
postTx :: MonadThrow m => PostChainTx tx -> m ()
$sel:postTx:Chain :: forall tx (m :: * -> *).
Chain tx m -> MonadThrow m => PostChainTx tx -> m ()
, Server tx m
$sel:server:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Server tx m
server :: Server tx m
, $sel:inputQueue:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: Input tx -> m ()
, $sel:env:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Environment
env = Environment{Party
$sel:party:Environment :: Environment -> Party
party :: Party
} = HydraNode tx m
data NodeState tx m = NodeState
{ forall tx (m :: * -> *).
NodeState tx m
-> forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
, forall tx (m :: * -> *). NodeState tx m -> STM m (HeadState tx)
queryHeadState :: STM m (HeadState tx)
, forall tx (m :: * -> *). NodeState tx m -> STM m EventId
getNextEventId :: STM m EventId
createNodeState ::
MonadLabelledSTM m =>
Maybe EventId ->
HeadState tx ->
m (NodeState tx m)
createNodeState :: forall (m :: * -> *) tx.
MonadLabelledSTM m =>
Maybe EventId -> HeadState tx -> m (NodeState tx m)
createNodeState Maybe EventId
lastSeenEventId HeadState tx
initialState = do
TVar m EventId
nextEventIdV <- EventId -> m (TVar m EventId)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (EventId -> m (TVar m EventId)) -> EventId -> m (TVar m EventId)
forall a b. (a -> b) -> a -> b
$ EventId -> (EventId -> EventId) -> Maybe EventId -> EventId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe EventId
0 (EventId -> EventId -> EventId
forall a. Num a => a -> a -> a
+ EventId
1) Maybe EventId
TVar m EventId -> FilePath -> m ()
forall a. TVar m a -> FilePath -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TVar m a -> FilePath -> m ()
labelTVarIO TVar m EventId
nextEventIdV FilePath
TVar m (HeadState tx)
hs <- HeadState tx -> m (TVar m (HeadState tx))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO HeadState tx
TVar m (HeadState tx) -> FilePath -> m ()
forall a. TVar m a -> FilePath -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TVar m a -> FilePath -> m ()
labelTVarIO TVar m (HeadState tx)
hs FilePath
NodeState tx m -> m (NodeState tx m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
{ $sel:modifyHeadState:NodeState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState = TVar m (HeadState tx)
-> (HeadState tx -> (a, HeadState tx)) -> STM m a
forall s a. TVar m s -> (s -> (a, s)) -> STM m a
forall (m :: * -> *) s a.
MonadSTM m =>
TVar m s -> (s -> (a, s)) -> STM m a
stateTVar TVar m (HeadState tx)
, $sel:queryHeadState:NodeState :: STM m (HeadState tx)
queryHeadState = TVar m (HeadState tx) -> STM m (HeadState tx)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (HeadState tx)
, $sel:getNextEventId:NodeState :: STM m EventId
getNextEventId = do
eventId <- TVar m EventId -> STM m EventId
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m EventId
TVar m EventId -> EventId -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m EventId
nextEventIdV (EventId -> STM m ()) -> EventId -> STM m ()
forall a b. (a -> b) -> a -> b
$ EventId
eventId EventId -> EventId -> EventId
forall a. Num a => a -> a -> a
+ EventId
EventId -> STM m EventId
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EventId
data HydraNodeLog tx
= BeginInput {forall tx. HydraNodeLog tx -> Party
by :: Party, forall tx. HydraNodeLog tx -> EventId
inputId :: Word64, forall tx. HydraNodeLog tx -> Input tx
input :: Input tx}
| EndInput {by :: Party, inputId :: Word64}
| BeginEffect {by :: Party, inputId :: Word64, forall tx. HydraNodeLog tx -> Word32
effectId :: Word32, forall tx. HydraNodeLog tx -> Effect tx
effect :: Effect tx}
| EndEffect {by :: Party, inputId :: Word64, effectId :: Word32}
| LogicOutcome {by :: Party, forall tx. HydraNodeLog tx -> Outcome tx
outcome :: Outcome tx}
| DroppedFromQueue {inputId :: Word64, input :: Input tx}
| LoadingState
| LoadedState {forall tx. HydraNodeLog tx -> Last EventId
lastEventId :: Last EventId, forall tx. HydraNodeLog tx -> HeadState tx
headState :: HeadState tx}
| ReplayingState
| Misconfiguration {forall tx. HydraNodeLog tx -> [ParamMismatch]
misconfigurationErrors :: [ParamMismatch]}
deriving stock ((forall x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x)
-> (forall x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx)
-> Generic (HydraNodeLog tx)
forall x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
forall x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall tx x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
forall tx x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
$cfrom :: forall tx x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
from :: forall x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
$cto :: forall tx x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
to :: forall x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
deriving stock instance IsChainState tx => Eq (HydraNodeLog tx)
deriving stock instance IsChainState tx => Show (HydraNodeLog tx)
deriving anyclass instance IsChainState tx => ToJSON (HydraNodeLog tx)
instance (ArbitraryIsTx tx, IsChainState tx) => Arbitrary (HydraNodeLog tx) where
arbitrary :: Gen (HydraNodeLog tx)
arbitrary = Gen (HydraNodeLog tx)
forall a.
(Generic a, GA UnsizedOpts (Rep a),
UniformWeight (Weights_ (Rep a))) =>
Gen a
shrink :: HydraNodeLog tx -> [HydraNodeLog tx]
shrink = HydraNodeLog tx -> [HydraNodeLog tx]
forall a.
(Generic a, RecursivelyShrink (Rep a), GSubterms (Rep a) a) =>
a -> [a]