{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE UndecidableInstances #-}

-- | Top-level module to run a single Hydra node.
--
-- Checkout [Hydra
-- Documentation](https://hydra.family/head-protocol/dev/architecture)
-- for some details about the overall architecture of the `Node`.
module Hydra.Node where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (
  MonadLabelledSTM,
  labelTVarIO,
  newTVarIO,
  stateTVar,
  writeTVar,
 )
import Control.Monad.Trans.Writer (execWriter, tell)
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.Server (Server, sendOutput)
import Hydra.Cardano.Api (AsType (AsPaymentKey, AsSigningKey, AsVerificationKey), getVerificationKey)
import Hydra.Chain (
  Chain (..),
  ChainEvent (..),
  ChainStateHistory,
  PostTxError,
 )
import Hydra.Chain.ChainState (ChainStateType, IsChainState)
import Hydra.Chain.Direct.Util (readFileTextEnvelopeThrow)
import Hydra.Events (EventId, EventSink (..), EventSource (..), StateEvent (..), getEventId, putEventsToSinks, stateChanged)
import Hydra.HeadLogic (
  Effect (..),
  HeadState (..),
  IdleState (..),
  Input (..),
  Outcome (..),
  aggregateState,
  defaultTTL,
  recoverChainStateHistory,
  recoverState,
 )
import Hydra.HeadLogic qualified as HeadLogic
import Hydra.HeadLogic.Outcome (StateChanged (..))
import Hydra.HeadLogic.State (getHeadParameters)
import Hydra.Ledger (Ledger)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (Network (..), NetworkCallback (..))
import Hydra.Network.Message (Message, NetworkEvent (..))
import Hydra.Node.InputQueue (InputQueue (..), Queued (..), createInputQueue)
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod)
import Hydra.Tx (HasParty (..), HeadParameters (..), Party (..), deriveParty)
import Hydra.Tx.Crypto (AsType (AsHydraKey))
import Hydra.Tx.Environment (Environment (..))
import Hydra.Tx.IsTx (ArbitraryIsTx)
import Hydra.Tx.Utils (verificationKeyToOnChainId)

-- * Environment Handling

-- | Intialize the 'Environment' from command line options.
initEnvironment :: RunOptions -> IO Environment
initEnvironment :: RunOptions -> IO Environment
initEnvironment RunOptions
options = do
  SigningKey HydraKey
sk <- AsType (SigningKey HydraKey)
-> FilePath -> IO (SigningKey HydraKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType HydraKey -> AsType (SigningKey HydraKey)
forall a. AsType a -> AsType (SigningKey a)
AsSigningKey AsType HydraKey
AsHydraKey) FilePath
hydraSigningKey
  [Party]
otherParties <- (FilePath -> IO Party) -> [FilePath] -> IO [Party]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM FilePath -> IO Party
loadParty [FilePath]
hydraVerificationKeys
  [OnChainId]
participants <- IO [OnChainId]
getParticipants
  Environment -> IO Environment
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Environment -> IO Environment) -> Environment -> IO Environment
forall a b. (a -> b) -> a -> b
$
    Environment
      { $sel:party:Environment :: Party
party = SigningKey HydraKey -> Party
deriveParty SigningKey HydraKey
sk
      , $sel:signingKey:Environment :: SigningKey HydraKey
signingKey = SigningKey HydraKey
sk
      , [Party]
otherParties :: [Party]
$sel:otherParties:Environment :: [Party]
otherParties
      , [OnChainId]
participants :: [OnChainId]
$sel:participants:Environment :: [OnChainId]
participants
      , ContestationPeriod
contestationPeriod :: ContestationPeriod
$sel:contestationPeriod:Environment :: ContestationPeriod
contestationPeriod
      }
 where
  -- XXX: This is mostly a cardano-specific initialization step of loading
  -- --cardano-verification-key options and deriving 'OnChainId's from it. We should be able to call out to the various chain layer
  getParticipants :: IO [OnChainId]
getParticipants =
    case ChainConfig
chainConfig of
      Offline{} -> [OnChainId] -> IO [OnChainId]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
      Direct
        DirectChainConfig
          { [FilePath]
cardanoVerificationKeys :: [FilePath]
$sel:cardanoVerificationKeys:DirectChainConfig :: DirectChainConfig -> [FilePath]
cardanoVerificationKeys
          , FilePath
cardanoSigningKey :: FilePath
$sel:cardanoSigningKey:DirectChainConfig :: DirectChainConfig -> FilePath
cardanoSigningKey
          } -> do
          SigningKey PaymentKey
ownSigningKey <- AsType (SigningKey PaymentKey)
-> FilePath -> IO (SigningKey PaymentKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType PaymentKey -> AsType (SigningKey PaymentKey)
forall a. AsType a -> AsType (SigningKey a)
AsSigningKey AsType PaymentKey
AsPaymentKey) FilePath
cardanoSigningKey
          [VerificationKey PaymentKey]
otherVerificationKeys <- (FilePath -> IO (VerificationKey PaymentKey))
-> [FilePath] -> IO [VerificationKey PaymentKey]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (AsType (VerificationKey PaymentKey)
-> FilePath -> IO (VerificationKey PaymentKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType PaymentKey -> AsType (VerificationKey PaymentKey)
forall a. AsType a -> AsType (VerificationKey a)
AsVerificationKey AsType PaymentKey
AsPaymentKey)) [FilePath]
cardanoVerificationKeys
          [OnChainId] -> IO [OnChainId]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([OnChainId] -> IO [OnChainId]) -> [OnChainId] -> IO [OnChainId]
forall a b. (a -> b) -> a -> b
$ VerificationKey PaymentKey -> OnChainId
verificationKeyToOnChainId (VerificationKey PaymentKey -> OnChainId)
-> [VerificationKey PaymentKey] -> [OnChainId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (SigningKey PaymentKey -> VerificationKey PaymentKey
forall keyrole.
(Key keyrole, HasTypeProxy keyrole) =>
SigningKey keyrole -> VerificationKey keyrole
getVerificationKey SigningKey PaymentKey
ownSigningKey VerificationKey PaymentKey
-> [VerificationKey PaymentKey] -> [VerificationKey PaymentKey]
forall a. a -> [a] -> [a]
: [VerificationKey PaymentKey]
otherVerificationKeys)

  contestationPeriod :: ContestationPeriod
contestationPeriod = case ChainConfig
chainConfig of
    Offline{} -> ContestationPeriod
defaultContestationPeriod
    Direct DirectChainConfig{$sel:contestationPeriod:DirectChainConfig :: DirectChainConfig -> ContestationPeriod
contestationPeriod = ContestationPeriod
cp} -> ContestationPeriod
cp

  loadParty :: FilePath -> IO Party
loadParty FilePath
p =
    VerificationKey HydraKey -> Party
Party (VerificationKey HydraKey -> Party)
-> IO (VerificationKey HydraKey) -> IO Party
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsType (VerificationKey HydraKey)
-> FilePath -> IO (VerificationKey HydraKey)
forall a. HasTextEnvelope a => AsType a -> FilePath -> IO a
readFileTextEnvelopeThrow (AsType HydraKey -> AsType (VerificationKey HydraKey)
forall a. AsType a -> AsType (VerificationKey a)
AsVerificationKey AsType HydraKey
AsHydraKey) FilePath
p

  RunOptions
    { FilePath
hydraSigningKey :: FilePath
$sel:hydraSigningKey:RunOptions :: RunOptions -> FilePath
hydraSigningKey
    , [FilePath]
hydraVerificationKeys :: [FilePath]
$sel:hydraVerificationKeys:RunOptions :: RunOptions -> [FilePath]
hydraVerificationKeys
    , ChainConfig
chainConfig :: ChainConfig
$sel:chainConfig:RunOptions :: RunOptions -> ChainConfig
chainConfig
    } = RunOptions
options

-- | Checks that command line options match a given 'HeadState'. This function
-- takes 'Environment' because it is derived from 'RunOptions' via
-- 'initEnvironment'.
--
-- Throws: 'ParameterMismatch' when state not matching the environment.
checkHeadState ::
  MonadThrow m =>
  Tracer m (HydraNodeLog tx) ->
  Environment ->
  HeadState tx ->
  m ()
checkHeadState :: forall (m :: * -> *) tx.
MonadThrow m =>
Tracer m (HydraNodeLog tx) -> Environment -> HeadState tx -> m ()
checkHeadState Tracer m (HydraNodeLog tx)
tracer Environment
env HeadState tx
headState = do
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ParamMismatch] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ParamMismatch]
paramsMismatch) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer ([ParamMismatch] -> HydraNodeLog tx
forall tx. [ParamMismatch] -> HydraNodeLog tx
Misconfiguration [ParamMismatch]
paramsMismatch)
    ParameterMismatch -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (ParameterMismatch -> m ()) -> ParameterMismatch -> m ()
forall a b. (a -> b) -> a -> b
$ [ParamMismatch] -> ParameterMismatch
ParameterMismatch [ParamMismatch]
paramsMismatch
 where
  paramsMismatch :: [ParamMismatch]
paramsMismatch =
    [ParamMismatch]
-> (HeadParameters -> [ParamMismatch])
-> Maybe HeadParameters
-> [ParamMismatch]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] HeadParameters -> [ParamMismatch]
validateParameters (Maybe HeadParameters -> [ParamMismatch])
-> Maybe HeadParameters -> [ParamMismatch]
forall a b. (a -> b) -> a -> b
$ HeadState tx -> Maybe HeadParameters
forall tx. HeadState tx -> Maybe HeadParameters
getHeadParameters HeadState tx
headState

  validateParameters :: HeadParameters -> [ParamMismatch]
validateParameters HeadParameters{$sel:contestationPeriod:HeadParameters :: HeadParameters -> ContestationPeriod
contestationPeriod = ContestationPeriod
loadedCp, [Party]
parties :: [Party]
$sel:parties:HeadParameters :: HeadParameters -> [Party]
parties} =
    Writer [ParamMismatch] () -> [ParamMismatch]
forall w a. Writer w a -> w
execWriter (Writer [ParamMismatch] () -> [ParamMismatch])
-> Writer [ParamMismatch] () -> [ParamMismatch]
forall a b. (a -> b) -> a -> b
$ do
      Bool -> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ContestationPeriod
loadedCp ContestationPeriod -> ContestationPeriod -> Bool
forall a. Eq a => a -> a -> Bool
/= ContestationPeriod
configuredCp) (Writer [ParamMismatch] () -> Writer [ParamMismatch] ())
-> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall a b. (a -> b) -> a -> b
$
        [ParamMismatch] -> Writer [ParamMismatch] ()
forall (m :: * -> *) w. Monad m => w -> WriterT w m ()
tell [ContestationPeriodMismatch{ContestationPeriod
loadedCp :: ContestationPeriod
$sel:loadedCp:ContestationPeriodMismatch :: ContestationPeriod
loadedCp, ContestationPeriod
configuredCp :: ContestationPeriod
$sel:configuredCp:ContestationPeriodMismatch :: ContestationPeriod
configuredCp}]

      let loadedParties :: [Party]
loadedParties = [Party] -> [Party]
forall a. Ord a => [a] -> [a]
sort [Party]
parties
          configuredParties :: [Party]
configuredParties = [Party] -> [Party]
forall a. Ord a => [a] -> [a]
sort (Party
party Party -> [Party] -> [Party]
forall a. a -> [a] -> [a]
: [Party]
otherParties)
      Bool -> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([Party]
loadedParties [Party] -> [Party] -> Bool
forall a. Eq a => a -> a -> Bool
/= [Party]
configuredParties) (Writer [ParamMismatch] () -> Writer [ParamMismatch] ())
-> Writer [ParamMismatch] () -> Writer [ParamMismatch] ()
forall a b. (a -> b) -> a -> b
$
        [ParamMismatch] -> Writer [ParamMismatch] ()
forall (m :: * -> *) w. Monad m => w -> WriterT w m ()
tell [PartiesMismatch{[Party]
loadedParties :: [Party]
$sel:loadedParties:ContestationPeriodMismatch :: [Party]
loadedParties, [Party]
configuredParties :: [Party]
$sel:configuredParties:ContestationPeriodMismatch :: [Party]
configuredParties}]

  Environment{$sel:contestationPeriod:Environment :: Environment -> ContestationPeriod
contestationPeriod = ContestationPeriod
configuredCp, [Party]
$sel:otherParties:Environment :: Environment -> [Party]
otherParties :: [Party]
otherParties, Party
$sel:party:Environment :: Environment -> Party
party :: Party
party} = Environment
env

-- * Create and run a hydra node

-- | A draft version of the 'HydraNode' that holds state, but is not yet
-- connected (see 'connect'). This is commonly created by the 'hydrate' smart
-- constructor.
data DraftHydraNode tx m = DraftHydraNode
  { forall tx (m :: * -> *).
DraftHydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
  , forall tx (m :: * -> *). DraftHydraNode tx m -> Environment
env :: Environment
  , forall tx (m :: * -> *). DraftHydraNode tx m -> Ledger tx
ledger :: Ledger tx
  , forall tx (m :: * -> *). DraftHydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
  , forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
  , forall tx (m :: * -> *).
DraftHydraNode tx m -> EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
  , forall tx (m :: * -> *).
DraftHydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
  , -- XXX: This is an odd field in here, but needed for the chain layer to
    -- bootstrap. Maybe move to NodeState or make it differently accessible?
    forall tx (m :: * -> *).
DraftHydraNode tx m -> ChainStateHistory tx
chainStateHistory :: ChainStateHistory tx
  }

instance HasParty (DraftHydraNode tx m) where
  getParty :: DraftHydraNode tx m -> Party
getParty DraftHydraNode{Environment
$sel:env:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> Environment
env :: Environment
env} = Environment -> Party
forall a. HasParty a => a -> Party
getParty Environment
env

-- | Hydrate a 'DraftHydraNode' by loading events from source, re-aggregate node
-- state and sending events to sinks while doing so.
hydrate ::
  (MonadDelay m, MonadLabelledSTM m, MonadAsync m, MonadThrow m, IsChainState tx) =>
  Tracer m (HydraNodeLog tx) ->
  Environment ->
  Ledger tx ->
  ChainStateType tx ->
  EventSource (StateEvent tx) m ->
  [EventSink (StateEvent tx) m] ->
  m (DraftHydraNode tx m)
hydrate :: forall (m :: * -> *) tx.
(MonadDelay m, MonadLabelledSTM m, MonadAsync m, MonadThrow m,
 IsChainState tx) =>
Tracer m (HydraNodeLog tx)
-> Environment
-> Ledger tx
-> ChainStateType tx
-> EventSource (StateEvent tx) m
-> [EventSink (StateEvent tx) m]
-> m (DraftHydraNode tx m)
hydrate Tracer m (HydraNodeLog tx)
tracer Environment
env Ledger tx
ledger ChainStateType tx
initialChainState EventSource (StateEvent tx) m
eventSource [EventSink (StateEvent tx) m]
eventSinks = do
  [StateEvent tx]
events <- EventSource (StateEvent tx) m
-> HasEventId (StateEvent tx) => m [StateEvent tx]
forall e (m :: * -> *). EventSource e m -> HasEventId e => m [e]
getEvents EventSource (StateEvent tx) m
eventSource
  let lastSeenEventId :: Maybe Word64
lastSeenEventId = StateEvent tx -> Word64
forall a. HasEventId a => a -> Word64
getEventId (StateEvent tx -> Word64)
-> (NonEmpty (StateEvent tx) -> StateEvent tx)
-> NonEmpty (StateEvent tx)
-> Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty (StateEvent tx) -> StateEvent tx
forall (f :: * -> *) a. IsNonEmpty f a a "last" => f a -> a
last (NonEmpty (StateEvent tx) -> Word64)
-> Maybe (NonEmpty (StateEvent tx)) -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [StateEvent tx] -> Maybe (NonEmpty (StateEvent tx))
forall a. [a] -> Maybe (NonEmpty a)
nonEmpty [StateEvent tx]
events
  Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer LoadedState{$sel:numberOfEvents:BeginInput :: Word64
numberOfEvents = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> Int -> Word64
forall a b. (a -> b) -> a -> b
$ [StateEvent tx] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [StateEvent tx]
events}
  let headState :: HeadState tx
headState = HeadState tx -> [StateChanged tx] -> HeadState tx
forall (t :: * -> *) tx.
(Foldable t, IsChainState tx) =>
HeadState tx -> t (StateChanged tx) -> HeadState tx
recoverState HeadState tx
initialState (StateEvent tx -> StateChanged tx
forall tx. StateEvent tx -> StateChanged tx
stateChanged (StateEvent tx -> StateChanged tx)
-> [StateEvent tx] -> [StateChanged tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [StateEvent tx]
events)
      chainStateHistory :: ChainStateHistory tx
chainStateHistory = ChainStateType tx -> [StateChanged tx] -> ChainStateHistory tx
forall (t :: * -> *) tx.
(Foldable t, IsChainState tx) =>
ChainStateType tx -> t (StateChanged tx) -> ChainStateHistory tx
recoverChainStateHistory ChainStateType tx
initialChainState (StateEvent tx -> StateChanged tx
forall tx. StateEvent tx -> StateChanged tx
stateChanged (StateEvent tx -> StateChanged tx)
-> [StateEvent tx] -> [StateChanged tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [StateEvent tx]
events)
  -- Check whether the loaded state matches our configuration (env)
  Tracer m (HydraNodeLog tx) -> Environment -> HeadState tx -> m ()
forall (m :: * -> *) tx.
MonadThrow m =>
Tracer m (HydraNodeLog tx) -> Environment -> HeadState tx -> m ()
checkHeadState Tracer m (HydraNodeLog tx)
tracer Environment
env HeadState tx
headState
  -- (Re-)submit events to sinks; de-duplication is handled by the sinks
  [EventSink (StateEvent tx) m] -> [StateEvent tx] -> m ()
forall (m :: * -> *) e.
(Monad m, HasEventId e) =>
[EventSink e m] -> [e] -> m ()
putEventsToSinks [EventSink (StateEvent tx) m]
eventSinks [StateEvent tx]
events
  NodeState tx m
nodeState <- Maybe Word64 -> HeadState tx -> m (NodeState tx m)
forall (m :: * -> *) tx.
MonadLabelledSTM m =>
Maybe Word64 -> HeadState tx -> m (NodeState tx m)
createNodeState Maybe Word64
lastSeenEventId HeadState tx
headState
  InputQueue m (Input tx)
inputQueue <- m (InputQueue m (Input tx))
forall (m :: * -> *) e.
(MonadDelay m, MonadAsync m, MonadLabelledSTM m) =>
m (InputQueue m e)
createInputQueue
  DraftHydraNode tx m -> m (DraftHydraNode tx m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    DraftHydraNode
      { Tracer m (HydraNodeLog tx)
$sel:tracer:DraftHydraNode :: Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
tracer
      , Environment
$sel:env:DraftHydraNode :: Environment
env :: Environment
env
      , Ledger tx
$sel:ledger:DraftHydraNode :: Ledger tx
ledger :: Ledger tx
ledger
      , NodeState tx m
$sel:nodeState:DraftHydraNode :: NodeState tx m
nodeState :: NodeState tx m
nodeState
      , InputQueue m (Input tx)
$sel:inputQueue:DraftHydraNode :: InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
inputQueue
      , EventSource (StateEvent tx) m
$sel:eventSource:DraftHydraNode :: EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
eventSource
      , [EventSink (StateEvent tx) m]
$sel:eventSinks:DraftHydraNode :: [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
eventSinks
      , ChainStateHistory tx
$sel:chainStateHistory:DraftHydraNode :: ChainStateHistory tx
chainStateHistory :: ChainStateHistory tx
chainStateHistory
      }
 where
  initialState :: HeadState tx
initialState = IdleState tx -> HeadState tx
forall tx. IdleState tx -> HeadState tx
Idle IdleState{$sel:chainState:IdleState :: ChainStateType tx
chainState = ChainStateType tx
initialChainState}

wireChainInput :: DraftHydraNode tx m -> (ChainEvent tx -> m ())
wireChainInput :: forall tx (m :: * -> *).
DraftHydraNode tx m -> ChainEvent tx -> m ()
wireChainInput DraftHydraNode tx m
node = Input tx -> m ()
enqueue (Input tx -> m ())
-> (ChainEvent tx -> Input tx) -> ChainEvent tx -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainEvent tx -> Input tx
forall tx. ChainEvent tx -> Input tx
ChainInput
 where
  DraftHydraNode{$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
enqueue :: Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue}} = DraftHydraNode tx m
node

wireClientInput :: DraftHydraNode tx m -> (ClientInput tx -> m ())
wireClientInput :: forall tx (m :: * -> *).
DraftHydraNode tx m -> ClientInput tx -> m ()
wireClientInput DraftHydraNode tx m
node = Input tx -> m ()
enqueue (Input tx -> m ())
-> (ClientInput tx -> Input tx) -> ClientInput tx -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClientInput tx -> Input tx
forall tx. ClientInput tx -> Input tx
ClientInput
 where
  DraftHydraNode{$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: Input tx -> m ()
enqueue}} = DraftHydraNode tx m
node

wireNetworkInput :: DraftHydraNode tx m -> NetworkCallback (NetworkEvent (Message tx)) m
wireNetworkInput :: forall tx (m :: * -> *).
DraftHydraNode tx m
-> NetworkCallback (NetworkEvent (Message tx)) m
wireNetworkInput DraftHydraNode tx m
node =
  NetworkCallback{$sel:deliver:NetworkCallback :: NetworkEvent (Message tx) -> m ()
deliver = Input tx -> m ()
enqueue (Input tx -> m ())
-> (NetworkEvent (Message tx) -> Input tx)
-> NetworkEvent (Message tx)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TTL -> NetworkEvent (Message tx) -> Input tx
forall tx. TTL -> NetworkEvent (Message tx) -> Input tx
NetworkInput TTL
defaultTTL}
 where
  DraftHydraNode{$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: Input tx -> m ()
enqueue}} = DraftHydraNode tx m
node

-- | Connect chain, network and API to a hydrated 'DraftHydraNode' to get a fully
-- connected 'HydraNode'.
connect ::
  Monad m =>
  Chain tx m ->
  Network m (Message tx) ->
  Server tx m ->
  DraftHydraNode tx m ->
  m (HydraNode tx m)
connect :: forall (m :: * -> *) tx.
Monad m =>
Chain tx m
-> Network m (Message tx)
-> Server tx m
-> DraftHydraNode tx m
-> m (HydraNode tx m)
connect Chain tx m
chain Network m (Message tx)
network Server tx m
server DraftHydraNode tx m
node =
  HydraNode tx m -> m (HydraNode tx m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HydraNode{Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
$sel:tracer:HydraNode :: Tracer m (HydraNodeLog tx)
tracer, Environment
env :: Environment
$sel:env:HydraNode :: Environment
env, Ledger tx
ledger :: Ledger tx
$sel:ledger:HydraNode :: Ledger tx
ledger, NodeState tx m
nodeState :: NodeState tx m
$sel:nodeState:HydraNode :: NodeState tx m
nodeState, InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
$sel:inputQueue:HydraNode :: InputQueue m (Input tx)
inputQueue, EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
$sel:eventSource:HydraNode :: EventSource (StateEvent tx) m
eventSource, [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
$sel:eventSinks:HydraNode :: [EventSink (StateEvent tx) m]
eventSinks, $sel:oc:HydraNode :: Chain tx m
oc = Chain tx m
chain, $sel:hn:HydraNode :: Network m (Message tx)
hn = Network m (Message tx)
network, Server tx m
server :: Server tx m
$sel:server:HydraNode :: Server tx m
server}
 where
  DraftHydraNode{Tracer m (HydraNodeLog tx)
$sel:tracer:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
tracer, Environment
$sel:env:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> Environment
env :: Environment
env, Ledger tx
$sel:ledger:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> Ledger tx
ledger :: Ledger tx
ledger, NodeState tx m
$sel:nodeState:DraftHydraNode :: forall tx (m :: * -> *). DraftHydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
nodeState, InputQueue m (Input tx)
$sel:inputQueue:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
inputQueue, EventSource (StateEvent tx) m
$sel:eventSource:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
eventSource, [EventSink (StateEvent tx) m]
$sel:eventSinks:DraftHydraNode :: forall tx (m :: * -> *).
DraftHydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
eventSinks} = DraftHydraNode tx m
node

-- | Fully connected hydra node with everything wired in.
data HydraNode tx m = HydraNode
  { forall tx (m :: * -> *).
HydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
  , forall tx (m :: * -> *). HydraNode tx m -> Environment
env :: Environment
  , forall tx (m :: * -> *). HydraNode tx m -> Ledger tx
ledger :: Ledger tx
  , forall tx (m :: * -> *). HydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
  , forall tx (m :: * -> *). HydraNode tx m -> InputQueue m (Input tx)
inputQueue :: InputQueue m (Input tx)
  , forall tx (m :: * -> *).
HydraNode tx m -> EventSource (StateEvent tx) m
eventSource :: EventSource (StateEvent tx) m
  , forall tx (m :: * -> *).
HydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
  , forall tx (m :: * -> *). HydraNode tx m -> Chain tx m
oc :: Chain tx m
  , forall tx (m :: * -> *). HydraNode tx m -> Network m (Message tx)
hn :: Network m (Message tx)
  , forall tx (m :: * -> *). HydraNode tx m -> Server tx m
server :: Server tx m
  }

instance HasParty (HydraNode tx m) where
  getParty :: HydraNode tx m -> Party
getParty HydraNode{Environment
$sel:env:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Environment
env :: Environment
env} = Environment -> Party
forall a. HasParty a => a -> Party
getParty Environment
env

runHydraNode ::
  ( MonadCatch m
  , MonadAsync m
  , IsChainState tx
  ) =>
  HydraNode tx m ->
  m ()
runHydraNode :: forall (m :: * -> *) tx.
(MonadCatch m, MonadAsync m, IsChainState tx) =>
HydraNode tx m -> m ()
runHydraNode HydraNode tx m
node =
  -- NOTE(SN): here we could introduce concurrent head processing, e.g. with
  -- something like 'forM_ [0..1] $ async'
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ HydraNode tx m -> m ()
forall (m :: * -> *) tx.
(MonadCatch m, MonadAsync m, IsChainState tx) =>
HydraNode tx m -> m ()
stepHydraNode HydraNode tx m
node

stepHydraNode ::
  ( MonadCatch m
  , MonadAsync m
  , IsChainState tx
  ) =>
  HydraNode tx m ->
  m ()
stepHydraNode :: forall (m :: * -> *) tx.
(MonadCatch m, MonadAsync m, IsChainState tx) =>
HydraNode tx m -> m ()
stepHydraNode HydraNode tx m
node = do
  i :: Queued (Input tx)
i@Queued{Word64
queuedId :: Word64
$sel:queuedId:Queued :: forall a. Queued a -> Word64
queuedId, Input tx
queuedItem :: Input tx
$sel:queuedItem:Queued :: forall a. Queued a -> a
queuedItem} <- m (Queued (Input tx))
dequeue
  Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ BeginInput{$sel:by:BeginInput :: Party
by = Party
party, $sel:inputId:BeginInput :: Word64
inputId = Word64
queuedId, $sel:input:BeginInput :: Input tx
input = Input tx
queuedItem}
  Outcome tx
outcome <- STM m (Outcome tx) -> m (Outcome tx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Outcome tx) -> m (Outcome tx))
-> STM m (Outcome tx) -> m (Outcome tx)
forall a b. (a -> b) -> a -> b
$ HydraNode tx m -> Input tx -> STM m (Outcome tx)
forall tx (m :: * -> *).
IsChainState tx =>
HydraNode tx m -> Input tx -> STM m (Outcome tx)
processNextInput HydraNode tx m
node Input tx
queuedItem
  Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (Party -> Outcome tx -> HydraNodeLog tx
forall tx. Party -> Outcome tx -> HydraNodeLog tx
LogicOutcome Party
party Outcome tx
outcome)
  case Outcome tx
outcome of
    Continue{[StateChanged tx]
stateChanges :: [StateChanged tx]
$sel:stateChanges:Continue :: forall tx. Outcome tx -> [StateChanged tx]
stateChanges, [Effect tx]
effects :: [Effect tx]
$sel:effects:Continue :: forall tx. Outcome tx -> [Effect tx]
effects} -> do
      HydraNode tx m -> [StateChanged tx] -> m ()
forall (m :: * -> *) tx.
MonadSTM m =>
HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges HydraNode tx m
node [StateChanged tx]
stateChanges
      HydraNode tx m
-> Tracer m (HydraNodeLog tx) -> Word64 -> [Effect tx] -> m ()
forall (m :: * -> *) tx.
(MonadAsync m, MonadCatch m, IsChainState tx) =>
HydraNode tx m
-> Tracer m (HydraNodeLog tx) -> Word64 -> [Effect tx] -> m ()
processEffects HydraNode tx m
node Tracer m (HydraNodeLog tx)
tracer Word64
queuedId [Effect tx]
effects
    Wait{[StateChanged tx]
$sel:stateChanges:Continue :: forall tx. Outcome tx -> [StateChanged tx]
stateChanges :: [StateChanged tx]
stateChanges} -> do
      HydraNode tx m -> [StateChanged tx] -> m ()
forall (m :: * -> *) tx.
MonadSTM m =>
HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges HydraNode tx m
node [StateChanged tx]
stateChanges
      Queued (Input tx) -> m ()
maybeReenqueue Queued (Input tx)
i
    Error{} -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer EndInput{$sel:by:BeginInput :: Party
by = Party
party, $sel:inputId:BeginInput :: Word64
inputId = Word64
queuedId}
 where
  maybeReenqueue :: Queued (Input tx) -> m ()
maybeReenqueue q :: Queued (Input tx)
q@Queued{Word64
$sel:queuedId:Queued :: forall a. Queued a -> Word64
queuedId :: Word64
queuedId, Input tx
$sel:queuedItem:Queued :: forall a. Queued a -> a
queuedItem :: Input tx
queuedItem} =
    case Input tx
queuedItem of
      NetworkInput TTL
ttl NetworkEvent (Message tx)
msg
        | TTL
ttl TTL -> TTL -> Bool
forall a. Ord a => a -> a -> Bool
> TTL
0 -> DiffTime -> Queued (Input tx) -> m ()
reenqueue DiffTime
waitDelay Queued (Input tx)
q{queuedItem = NetworkInput (ttl - 1) msg}
      Input tx
_ -> Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ DroppedFromQueue{$sel:inputId:BeginInput :: Word64
inputId = Word64
queuedId, $sel:input:BeginInput :: Input tx
input = Input tx
queuedItem}

  Environment{Party
$sel:party:Environment :: Environment -> Party
party :: Party
party} = Environment
env

  HydraNode{Tracer m (HydraNodeLog tx)
$sel:tracer:HydraNode :: forall tx (m :: * -> *).
HydraNode tx m -> Tracer m (HydraNodeLog tx)
tracer :: Tracer m (HydraNodeLog tx)
tracer, $sel:inputQueue:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{m (Queued (Input tx))
dequeue :: m (Queued (Input tx))
$sel:dequeue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> m (Queued e)
dequeue, DiffTime -> Queued (Input tx) -> m ()
reenqueue :: DiffTime -> Queued (Input tx) -> m ()
$sel:reenqueue:InputQueue :: forall (m :: * -> *) e.
InputQueue m e -> DiffTime -> Queued e -> m ()
reenqueue}, Environment
$sel:env:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Environment
env :: Environment
env} = HydraNode tx m
node

-- | The time to wait between re-enqueuing a 'Wait' outcome from 'HeadLogic'.
waitDelay :: DiffTime
waitDelay :: DiffTime
waitDelay = DiffTime
0.1

-- | Monadic interface around 'Hydra.Logic.update'.
processNextInput ::
  IsChainState tx =>
  HydraNode tx m ->
  Input tx ->
  STM m (Outcome tx)
processNextInput :: forall tx (m :: * -> *).
IsChainState tx =>
HydraNode tx m -> Input tx -> STM m (Outcome tx)
processNextInput HydraNode{NodeState tx m
$sel:nodeState:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> NodeState tx m
nodeState :: NodeState tx m
nodeState, Ledger tx
$sel:ledger:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Ledger tx
ledger :: Ledger tx
ledger, Environment
$sel:env:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Environment
env :: Environment
env} Input tx
e =
  (HeadState tx -> (Outcome tx, HeadState tx)) -> STM m (Outcome tx)
forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState ((HeadState tx -> (Outcome tx, HeadState tx))
 -> STM m (Outcome tx))
-> (HeadState tx -> (Outcome tx, HeadState tx))
-> STM m (Outcome tx)
forall a b. (a -> b) -> a -> b
$ \HeadState tx
s ->
    let outcome :: Outcome tx
outcome = HeadState tx -> Input tx -> Outcome tx
computeOutcome HeadState tx
s Input tx
e
     in (Outcome tx
outcome, HeadState tx -> Outcome tx -> HeadState tx
forall tx.
IsChainState tx =>
HeadState tx -> Outcome tx -> HeadState tx
aggregateState HeadState tx
s Outcome tx
outcome)
 where
  NodeState{forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
$sel:modifyHeadState:NodeState :: forall tx (m :: * -> *).
NodeState tx m
-> forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState} = NodeState tx m
nodeState

  computeOutcome :: HeadState tx -> Input tx -> Outcome tx
computeOutcome = Environment -> Ledger tx -> HeadState tx -> Input tx -> Outcome tx
forall tx.
IsChainState tx =>
Environment -> Ledger tx -> HeadState tx -> Input tx -> Outcome tx
HeadLogic.update Environment
env Ledger tx
ledger

processStateChanges :: MonadSTM m => HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges :: forall (m :: * -> *) tx.
MonadSTM m =>
HydraNode tx m -> [StateChanged tx] -> m ()
processStateChanges HydraNode tx m
node [StateChanged tx]
stateChanges = do
  [StateEvent tx]
events <- STM m [StateEvent tx] -> m [StateEvent tx]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [StateEvent tx] -> m [StateEvent tx])
-> ((StateChanged tx -> STM m (StateEvent tx))
    -> STM m [StateEvent tx])
-> (StateChanged tx -> STM m (StateEvent tx))
-> m [StateEvent tx]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StateChanged tx]
-> (StateChanged tx -> STM m (StateEvent tx))
-> STM m [StateEvent tx]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [StateChanged tx]
stateChanges ((StateChanged tx -> STM m (StateEvent tx)) -> m [StateEvent tx])
-> (StateChanged tx -> STM m (StateEvent tx)) -> m [StateEvent tx]
forall a b. (a -> b) -> a -> b
$ \StateChanged tx
stateChanged -> do
    Word64
eventId <- STM m Word64
getNextEventId
    StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StateEvent{Word64
eventId :: Word64
$sel:eventId:StateEvent :: Word64
eventId, StateChanged tx
$sel:stateChanged:StateEvent :: StateChanged tx
stateChanged :: StateChanged tx
stateChanged}
  [EventSink (StateEvent tx) m] -> [StateEvent tx] -> m ()
forall (m :: * -> *) e.
(Monad m, HasEventId e) =>
[EventSink e m] -> [e] -> m ()
putEventsToSinks [EventSink (StateEvent tx) m]
eventSinks [StateEvent tx]
events
 where
  HydraNode
    { [EventSink (StateEvent tx) m]
$sel:eventSinks:HydraNode :: forall tx (m :: * -> *).
HydraNode tx m -> [EventSink (StateEvent tx) m]
eventSinks :: [EventSink (StateEvent tx) m]
eventSinks
    , $sel:nodeState:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> NodeState tx m
nodeState = NodeState{STM m Word64
getNextEventId :: STM m Word64
$sel:getNextEventId:NodeState :: forall tx (m :: * -> *). NodeState tx m -> STM m Word64
getNextEventId}
    } = HydraNode tx m
node

processEffects ::
  ( MonadAsync m
  , MonadCatch m
  , IsChainState tx
  ) =>
  HydraNode tx m ->
  Tracer m (HydraNodeLog tx) ->
  Word64 ->
  [Effect tx] ->
  m ()
processEffects :: forall (m :: * -> *) tx.
(MonadAsync m, MonadCatch m, IsChainState tx) =>
HydraNode tx m
-> Tracer m (HydraNodeLog tx) -> Word64 -> [Effect tx] -> m ()
processEffects HydraNode tx m
node Tracer m (HydraNodeLog tx)
tracer Word64
inputId [Effect tx]
effects = do
  ((Effect tx, Word32) -> m ()) -> [(Effect tx, Word32)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Effect tx, Word32) -> m ()
processEffect ([(Effect tx, Word32)] -> m ()) -> [(Effect tx, Word32)] -> m ()
forall a b. (a -> b) -> a -> b
$ [Effect tx] -> [Word32] -> [(Effect tx, Word32)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Effect tx]
effects [Word32
0 ..]
 where
  processEffect :: (Effect tx, Word32) -> m ()
processEffect (Effect tx
effect, Word32
effectId) = do
    Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ Party -> Word64 -> Word32 -> Effect tx -> HydraNodeLog tx
forall tx.
Party -> Word64 -> Word32 -> Effect tx -> HydraNodeLog tx
BeginEffect Party
party Word64
inputId Word32
effectId Effect tx
effect
    case Effect tx
effect of
      ClientEffect ServerOutput tx
i -> Server tx m -> ServerOutput tx -> m ()
forall tx (m :: * -> *). Server tx m -> ServerOutput tx -> m ()
sendOutput Server tx m
server ServerOutput tx
i
      NetworkEffect Message tx
msg -> Network m (Message tx) -> Message tx -> m ()
forall (m :: * -> *) msg. Network m msg -> msg -> m ()
broadcast Network m (Message tx)
hn Message tx
msg
      OnChainEffect{PostChainTx tx
postChainTx :: PostChainTx tx
$sel:postChainTx:ClientEffect :: forall tx. Effect tx -> PostChainTx tx
postChainTx} ->
        MonadThrow m => PostChainTx tx -> m ()
PostChainTx tx -> m ()
postTx PostChainTx tx
postChainTx
          m () -> (PostTxError tx -> m ()) -> m ()
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(PostTxError tx
postTxError :: PostTxError tx) ->
            Input tx -> m ()
enqueue (Input tx -> m ())
-> (ChainEvent tx -> Input tx) -> ChainEvent tx -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainEvent tx -> Input tx
forall tx. ChainEvent tx -> Input tx
ChainInput (ChainEvent tx -> m ()) -> ChainEvent tx -> m ()
forall a b. (a -> b) -> a -> b
$ PostTxError{PostChainTx tx
postChainTx :: PostChainTx tx
$sel:postChainTx:Observation :: PostChainTx tx
postChainTx, PostTxError tx
postTxError :: PostTxError tx
$sel:postTxError:Observation :: PostTxError tx
postTxError}
    Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer (HydraNodeLog tx -> m ()) -> HydraNodeLog tx -> m ()
forall a b. (a -> b) -> a -> b
$ Party -> Word64 -> Word32 -> HydraNodeLog tx
forall tx. Party -> Word64 -> Word32 -> HydraNodeLog tx
EndEffect Party
party Word64
inputId Word32
effectId

  HydraNode
    { Network m (Message tx)
$sel:hn:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Network m (Message tx)
hn :: Network m (Message tx)
hn
    , $sel:oc:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Chain tx m
oc = Chain{MonadThrow m => PostChainTx tx -> m ()
postTx :: MonadThrow m => PostChainTx tx -> m ()
$sel:postTx:Chain :: forall tx (m :: * -> *).
Chain tx m -> MonadThrow m => PostChainTx tx -> m ()
postTx}
    , Server tx m
$sel:server:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Server tx m
server :: Server tx m
server
    , $sel:inputQueue:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> InputQueue m (Input tx)
inputQueue = InputQueue{Input tx -> m ()
$sel:enqueue:InputQueue :: forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: Input tx -> m ()
enqueue}
    , $sel:env:HydraNode :: forall tx (m :: * -> *). HydraNode tx m -> Environment
env = Environment{Party
$sel:party:Environment :: Environment -> Party
party :: Party
party}
    } = HydraNode tx m
node

-- ** Manage state

-- | Handle to access and modify the state in the Hydra Node.
data NodeState tx m = NodeState
  { forall tx (m :: * -> *).
NodeState tx m
-> forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
  , forall tx (m :: * -> *). NodeState tx m -> STM m (HeadState tx)
queryHeadState :: STM m (HeadState tx)
  , forall tx (m :: * -> *). NodeState tx m -> STM m Word64
getNextEventId :: STM m EventId
  }

-- | Initialize a new 'NodeState'.
createNodeState ::
  MonadLabelledSTM m =>
  -- | Last seen 'EventId'.
  Maybe EventId ->
  HeadState tx ->
  m (NodeState tx m)
createNodeState :: forall (m :: * -> *) tx.
MonadLabelledSTM m =>
Maybe Word64 -> HeadState tx -> m (NodeState tx m)
createNodeState Maybe Word64
lastSeenEventId HeadState tx
initialState = do
  TVar m Word64
nextEventIdV <- Word64 -> m (TVar m Word64)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Word64 -> m (TVar m Word64)) -> Word64 -> m (TVar m Word64)
forall a b. (a -> b) -> a -> b
$ Word64 -> (Word64 -> Word64) -> Maybe Word64 -> Word64
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word64
0 (Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1) Maybe Word64
lastSeenEventId
  TVar m Word64 -> FilePath -> m ()
forall a. TVar m a -> FilePath -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TVar m a -> FilePath -> m ()
labelTVarIO TVar m Word64
nextEventIdV FilePath
"next-event-id"
  TVar m (HeadState tx)
hs <- HeadState tx -> m (TVar m (HeadState tx))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO HeadState tx
initialState
  TVar m (HeadState tx) -> FilePath -> m ()
forall a. TVar m a -> FilePath -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TVar m a -> FilePath -> m ()
labelTVarIO TVar m (HeadState tx)
hs FilePath
"head-state"
  NodeState tx m -> m (NodeState tx m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    NodeState
      { $sel:modifyHeadState:NodeState :: forall a. (HeadState tx -> (a, HeadState tx)) -> STM m a
modifyHeadState = TVar m (HeadState tx)
-> (HeadState tx -> (a, HeadState tx)) -> STM m a
forall s a. TVar m s -> (s -> (a, s)) -> STM m a
forall (m :: * -> *) s a.
MonadSTM m =>
TVar m s -> (s -> (a, s)) -> STM m a
stateTVar TVar m (HeadState tx)
hs
      , $sel:queryHeadState:NodeState :: STM m (HeadState tx)
queryHeadState = TVar m (HeadState tx) -> STM m (HeadState tx)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (HeadState tx)
hs
      , $sel:getNextEventId:NodeState :: STM m Word64
getNextEventId = do
          Word64
eventId <- TVar m Word64 -> STM m Word64
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Word64
nextEventIdV
          TVar m Word64 -> Word64 -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m Word64
nextEventIdV (Word64 -> STM m ()) -> Word64 -> STM m ()
forall a b. (a -> b) -> a -> b
$ Word64
eventId Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1
          Word64 -> STM m Word64
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Word64
eventId
      }

-- * Logging

data HydraNodeLog tx
  = BeginInput {forall tx. HydraNodeLog tx -> Party
by :: Party, forall tx. HydraNodeLog tx -> Word64
inputId :: Word64, forall tx. HydraNodeLog tx -> Input tx
input :: Input tx}
  | EndInput {by :: Party, inputId :: Word64}
  | BeginEffect {by :: Party, inputId :: Word64, forall tx. HydraNodeLog tx -> Word32
effectId :: Word32, forall tx. HydraNodeLog tx -> Effect tx
effect :: Effect tx}
  | EndEffect {by :: Party, inputId :: Word64, effectId :: Word32}
  | LogicOutcome {by :: Party, forall tx. HydraNodeLog tx -> Outcome tx
outcome :: Outcome tx}
  | DroppedFromQueue {inputId :: Word64, input :: Input tx}
  | LoadedState {forall tx. HydraNodeLog tx -> Word64
numberOfEvents :: Word64}
  | Misconfiguration {forall tx. HydraNodeLog tx -> [ParamMismatch]
misconfigurationErrors :: [ParamMismatch]}
  deriving stock ((forall x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x)
-> (forall x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx)
-> Generic (HydraNodeLog tx)
forall x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
forall x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall tx x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
forall tx x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
$cfrom :: forall tx x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
from :: forall x. HydraNodeLog tx -> Rep (HydraNodeLog tx) x
$cto :: forall tx x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
to :: forall x. Rep (HydraNodeLog tx) x -> HydraNodeLog tx
Generic)

deriving stock instance IsChainState tx => Eq (HydraNodeLog tx)
deriving stock instance IsChainState tx => Show (HydraNodeLog tx)
deriving anyclass instance IsChainState tx => ToJSON (HydraNodeLog tx)
deriving anyclass instance IsChainState tx => FromJSON (HydraNodeLog tx)

instance (ArbitraryIsTx tx, IsChainState tx) => Arbitrary (HydraNodeLog tx) where
  arbitrary :: Gen (HydraNodeLog tx)
arbitrary = Gen (HydraNodeLog tx)
forall a.
(Generic a, GA UnsizedOpts (Rep a),
 UniformWeight (Weights_ (Rep a))) =>
Gen a
genericArbitrary
  shrink :: HydraNodeLog tx -> [HydraNodeLog tx]
shrink = HydraNodeLog tx -> [HydraNodeLog tx]
forall a.
(Generic a, RecursivelyShrink (Rep a), GSubterms (Rep a) a) =>
a -> [a]
genericShrink