{-# LANGUAGE DuplicateRecordFields #-}
module Hydra.Node.Network (
NetworkConfiguration (..),
withNetwork,
withFlipHeartbeats,
configureMessagePersistence,
acksFile,
) where
import Hydra.Prelude hiding (fromList, replicate)
import Control.Tracer (Tracer)
import Hydra.Crypto (HydraKey, SigningKey)
import Hydra.Logging (traceWith)
import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Network (Host (..), IP, NetworkComponent, NodeId, PortNumber)
import Hydra.Network.Authenticate (Authenticated (Authenticated), Signed, withAuthentication)
import Hydra.Network.Heartbeat (ConnectionMessages, Heartbeat (..), withHeartbeat)
import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork)
import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePersistence, withReliability)
import Hydra.Node (HydraNodeLog (..))
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Party (Party, deriveParty)
import Hydra.Persistence (Persistence (..), createPersistence, createPersistenceIncremental)
import System.FilePath ((</>))
type LogEntry tx msg = HydraLog tx (WithHost (TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg)))))
data NetworkConfiguration m = NetworkConfiguration
{ forall {k} (m :: k). NetworkConfiguration m -> FilePath
persistenceDir :: FilePath
, forall {k} (m :: k). NetworkConfiguration m -> SigningKey HydraKey
signingKey :: SigningKey HydraKey
, forall {k} (m :: k). NetworkConfiguration m -> [Party]
otherParties :: [Party]
, forall {k} (m :: k). NetworkConfiguration m -> IP
host :: IP
, forall {k} (m :: k). NetworkConfiguration m -> PortNumber
port :: PortNumber
, forall {k} (m :: k). NetworkConfiguration m -> [Host]
peers :: [Host]
, forall {k} (m :: k). NetworkConfiguration m -> NodeId
nodeId :: NodeId
}
withNetwork ::
(ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
Tracer IO (LogEntry tx msg) ->
ConnectionMessages IO ->
NetworkConfiguration IO ->
NetworkComponent IO (Authenticated msg) msg ()
withNetwork :: forall msg tx.
(ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
Tracer IO (LogEntry tx msg)
-> ConnectionMessages IO
-> NetworkConfiguration IO
-> NetworkComponent IO (Authenticated msg) msg ()
withNetwork Tracer IO (LogEntry tx msg)
tracer ConnectionMessages IO
connectionMessages NetworkConfiguration IO
configuration NetworkCallback (Authenticated msg) IO
callback Network IO msg -> IO ()
action = do
let localhost :: Host
localhost = Host{$sel:hostname:Host :: Text
hostname = IP -> Text
forall b a. (Show a, IsString b) => a -> b
show IP
host, PortNumber
port :: PortNumber
$sel:port:Host :: PortNumber
port}
me :: Party
me = SigningKey HydraKey -> Party
deriveParty SigningKey HydraKey
signingKey
numberOfParties :: Int
numberOfParties = [Party] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([Party] -> Int) -> [Party] -> Int
forall a b. (a -> b) -> a -> b
$ Party
me Party -> [Party] -> [Party]
forall a. a -> [a] -> [a]
: [Party]
otherParties
MessagePersistence IO msg
messagePersistence <- Tracer IO (HydraNodeLog tx)
-> FilePath -> Int -> IO (MessagePersistence IO msg)
forall (m :: * -> *) msg tx.
(MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m,
MonadThread m, MonadThrow (STM m)) =>
Tracer m (HydraNodeLog tx)
-> FilePath -> Int -> m (MessagePersistence m msg)
configureMessagePersistence ((HydraNodeLog tx -> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg) -> Tracer IO (HydraNodeLog tx)
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap HydraNodeLog tx -> LogEntry tx msg
forall tx net. HydraNodeLog tx -> HydraLog tx net
Node Tracer IO (LogEntry tx msg)
tracer) FilePath
persistenceDir Int
numberOfParties
let reliability :: NetworkComponent
IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
reliability =
NetworkComponent
IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
-> NetworkComponent
IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
forall (m :: * -> *) msg msg1 a.
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
-> NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
withFlipHeartbeats (NetworkComponent
IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
-> NetworkComponent
IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ())
-> NetworkComponent
IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
-> NetworkComponent
IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
forall a b. (a -> b) -> a -> b
$
Tracer IO ReliabilityLog
-> MessagePersistence IO msg
-> Party
-> [Party]
-> NetworkComponent
IO
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
()
-> NetworkComponent
IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
a
-> NetworkComponent
m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability ((ReliabilityLog -> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg) -> Tracer IO ReliabilityLog
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ReliabilityLog -> LogEntry tx msg
forall tx net. ReliabilityLog -> HydraLog tx net
Reliability Tracer IO (LogEntry tx msg)
tracer) MessagePersistence IO msg
messagePersistence Party
me [Party]
otherParties (NetworkComponent
IO
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
()
-> NetworkComponent
IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ())
-> NetworkComponent
IO
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
()
-> NetworkComponent
IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
forall a b. (a -> b) -> a -> b
$
Tracer IO AuthLog
-> SigningKey HydraKey
-> [Party]
-> NetworkComponent
IO
(Signed (ReliableMsg (Heartbeat msg)))
(Signed (ReliableMsg (Heartbeat msg)))
()
-> NetworkComponent
IO
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
()
forall msg (m :: * -> *) a.
(SignableRepresentation msg, ToJSON msg) =>
Tracer m AuthLog
-> SigningKey HydraKey
-> [Party]
-> NetworkComponent m (Signed msg) (Signed msg) a
-> NetworkComponent m (Authenticated msg) msg a
withAuthentication ((AuthLog -> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg) -> Tracer IO AuthLog
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap AuthLog -> LogEntry tx msg
forall tx net. AuthLog -> HydraLog tx net
Authentication Tracer IO (LogEntry tx msg)
tracer) SigningKey HydraKey
signingKey [Party]
otherParties (NetworkComponent
IO
(Signed (ReliableMsg (Heartbeat msg)))
(Signed (ReliableMsg (Heartbeat msg)))
()
-> NetworkComponent
IO
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
())
-> NetworkComponent
IO
(Signed (ReliableMsg (Heartbeat msg)))
(Signed (ReliableMsg (Heartbeat msg)))
()
-> NetworkComponent
IO
(Authenticated (ReliableMsg (Heartbeat msg)))
(ReliableMsg (Heartbeat msg))
()
forall a b. (a -> b) -> a -> b
$
Tracer
IO
(WithHost
(TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg)))))
-> Host
-> [Host]
-> NetworkComponent
IO
(Signed (ReliableMsg (Heartbeat msg)))
(Signed (ReliableMsg (Heartbeat msg)))
()
forall msg.
(ToCBOR msg, FromCBOR msg) =>
Tracer IO (WithHost (TraceOuroborosNetwork msg))
-> Host -> [Host] -> NetworkComponent IO msg msg ()
withOuroborosNetwork ((WithHost
(TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg))))
-> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg)
-> Tracer
IO
(WithHost
(TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg)))))
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap WithHost
(TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg))))
-> LogEntry tx msg
forall tx net. net -> HydraLog tx net
Network Tracer IO (LogEntry tx msg)
tracer) Host
localhost [Host]
peers
NodeId
-> ConnectionMessages IO
-> NetworkComponent
IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
-> NetworkComponent IO (Authenticated msg) msg ()
forall (m :: * -> *) msg1 msg a.
(MonadAsync m, MonadDelay m) =>
NodeId
-> ConnectionMessages m
-> NetworkComponent m (Heartbeat msg1) (Heartbeat msg) a
-> NetworkComponent m msg1 msg a
withHeartbeat NodeId
nodeId ConnectionMessages IO
connectionMessages NetworkComponent
IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
reliability NetworkCallback (Authenticated msg) IO
callback ((Network IO msg -> IO ()) -> IO ())
-> (Network IO msg -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Network IO msg
network ->
Network IO msg -> IO ()
action Network IO msg
network
where
NetworkConfiguration{FilePath
$sel:persistenceDir:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> FilePath
persistenceDir :: FilePath
persistenceDir, SigningKey HydraKey
$sel:signingKey:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> SigningKey HydraKey
signingKey :: SigningKey HydraKey
signingKey, [Party]
$sel:otherParties:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> [Party]
otherParties :: [Party]
otherParties, IP
$sel:host:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> IP
host :: IP
host, PortNumber
$sel:port:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> PortNumber
port :: PortNumber
port, [Host]
$sel:peers:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> [Host]
peers :: [Host]
peers, NodeId
$sel:nodeId:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> NodeId
nodeId :: NodeId
nodeId} = NetworkConfiguration IO
configuration
configureMessagePersistence ::
(MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m (HydraNodeLog tx) ->
FilePath ->
Int ->
m (MessagePersistence m msg)
configureMessagePersistence :: forall (m :: * -> *) msg tx.
(MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m,
MonadThread m, MonadThrow (STM m)) =>
Tracer m (HydraNodeLog tx)
-> FilePath -> Int -> m (MessagePersistence m msg)
configureMessagePersistence Tracer m (HydraNodeLog tx)
tracer FilePath
persistenceDir Int
numberOfParties = do
PersistenceIncremental (Heartbeat msg) m
msgPersistence <- FilePath -> m (PersistenceIncremental (Heartbeat msg) m)
forall a (m :: * -> *).
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m,
MonadThrow (STM m)) =>
FilePath -> m (PersistenceIncremental a m)
createPersistenceIncremental (FilePath -> m (PersistenceIncremental (Heartbeat msg) m))
-> FilePath -> m (PersistenceIncremental (Heartbeat msg) m)
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath
storedMessagesFile FilePath
persistenceDir
ackPersistence :: Persistence (Vector Int) m
ackPersistence@Persistence{FromJSON (Vector Int) => m (Maybe (Vector Int))
load :: FromJSON (Vector Int) => m (Maybe (Vector Int))
$sel:load:Persistence :: forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load} <- FilePath -> m (Persistence (Vector Int) m)
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
FilePath -> m (Persistence a m)
createPersistence (FilePath -> m (Persistence (Vector Int) m))
-> FilePath -> m (Persistence (Vector Int) m)
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath
acksFile FilePath
persistenceDir
Maybe (Vector Int)
mAcks <- m (Maybe (Vector Int))
FromJSON (Vector Int) => m (Maybe (Vector Int))
load
Persistence (Vector Int) m
ackPersistence' <- case (Vector Int -> Bool) -> Maybe (Vector Int) -> Maybe Bool
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\Vector Int
acks -> Vector Int -> Int
forall a. Vector a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Vector Int
acks Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
numberOfParties) Maybe (Vector Int)
mAcks of
Just Bool
False -> do
let paramsMismatch :: [ParamMismatch]
paramsMismatch = [SavedNetworkPartiesInconsistent{Int
numberOfParties :: Int
$sel:numberOfParties:ContestationPeriodMismatch :: Int
numberOfParties}]
Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer ([ParamMismatch] -> HydraNodeLog tx
forall tx. [ParamMismatch] -> HydraNodeLog tx
Misconfiguration [ParamMismatch]
paramsMismatch)
ParameterMismatch -> m (Persistence (Vector Int) m)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (ParameterMismatch -> m (Persistence (Vector Int) m))
-> ParameterMismatch -> m (Persistence (Vector Int) m)
forall a b. (a -> b) -> a -> b
$ [ParamMismatch] -> ParameterMismatch
ParameterMismatch [ParamMismatch]
paramsMismatch
Maybe Bool
_ -> Persistence (Vector Int) m -> m (Persistence (Vector Int) m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Persistence (Vector Int) m
ackPersistence
MessagePersistence m msg -> m (MessagePersistence m msg)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessagePersistence m msg -> m (MessagePersistence m msg))
-> MessagePersistence m msg -> m (MessagePersistence m msg)
forall a b. (a -> b) -> a -> b
$ Int
-> PersistenceIncremental (Heartbeat msg) m
-> Persistence (Vector Int) m
-> MessagePersistence m msg
forall (m :: * -> *) msg.
(MonadThrow m, FromJSON msg, ToJSON msg) =>
Int
-> PersistenceIncremental (Heartbeat msg) m
-> Persistence (Vector Int) m
-> MessagePersistence m msg
mkMessagePersistence Int
numberOfParties PersistenceIncremental (Heartbeat msg) m
msgPersistence Persistence (Vector Int) m
ackPersistence'
withFlipHeartbeats ::
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a ->
NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
withFlipHeartbeats :: forall (m :: * -> *) msg msg1 a.
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
-> NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
withFlipHeartbeats NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
withBaseNetwork NetworkCallback (Heartbeat (Authenticated msg)) m
callback =
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
withBaseNetwork Authenticated (Heartbeat msg) -> m ()
unwrapHeartbeats
where
unwrapHeartbeats :: Authenticated (Heartbeat msg) -> m ()
unwrapHeartbeats = \case
Authenticated (Data NodeId
nid msg
msg) Party
party -> NetworkCallback (Heartbeat (Authenticated msg)) m
callback NetworkCallback (Heartbeat (Authenticated msg)) m
-> NetworkCallback (Heartbeat (Authenticated msg)) m
forall a b. (a -> b) -> a -> b
$ NodeId -> Authenticated msg -> Heartbeat (Authenticated msg)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
nid (msg -> Party -> Authenticated msg
forall msg. msg -> Party -> Authenticated msg
Authenticated msg
msg Party
party)
Authenticated (Ping NodeId
nid) Party
_ -> NetworkCallback (Heartbeat (Authenticated msg)) m
callback NetworkCallback (Heartbeat (Authenticated msg)) m
-> NetworkCallback (Heartbeat (Authenticated msg)) m
forall a b. (a -> b) -> a -> b
$ NodeId -> Heartbeat (Authenticated msg)
forall msg. NodeId -> Heartbeat msg
Ping NodeId
nid
storedMessagesFile :: FilePath -> FilePath
storedMessagesFile :: FilePath -> FilePath
storedMessagesFile = (FilePath -> FilePath -> FilePath
</> FilePath
"network-messages")
acksFile :: FilePath -> FilePath
acksFile :: FilePath -> FilePath
acksFile = (FilePath -> FilePath -> FilePath
</> FilePath
"acks")