{-# LANGUAGE DuplicateRecordFields #-}

-- | Concrete `Hydra.Network` stack dedicated to running a hydra-node.
--
-- This module provides a `withNetwork` function which is the composition of several layers in order to provide various capabilities:
--
--   * `withHeartbeat` maintains knowledge about peers' connectivity,
--   * `withReliability` deals with connections reliability, handling the case
--     of messages being dropped (but not node crash in general),
--   * `withAuthentication` handles messages' authentication and signature verification,
--   * `withOuroborosNetwork` deals with maintaining individual connections to peers and the nitty-gritty details of messages sending and retrieval.
--
-- The following diagram details the various types of messages each layer is
-- exchanging with its predecessors and successors.
--
-- @
--
--          ▲                                    │
--          │ Authenticate msg                   │ msg
--          │                                    │
-- ┌────────┴────────────────────────────────────▼──────┐
-- │                                                    │
-- │                   Heartbeat                        │
-- │        ▲                                           │
-- └────────┬────────────────────────────────────┼──────┘
--          │                                    │
--          │ Heartbeat (Authenticate msg)       │ Heartbeat msg
--          │                                    │
-- ┌────────┴───────────────┐                    │
-- │                        │                    │
-- │    FlipHeartbeats      │                    │
-- │                        │                    │
-- └────────▲───────────────┘                    │
--          │                                    │
--          │ Authenticate (Heartbeat msg)       │
--          │                                    │
-- ┌────────┴────────────────────────────────────▼──────┐
-- │                                                    │
-- │                   Reliability                      │
-- │                                                    │
-- └─────────▲───────────────────────────────────┼──────┘
--           │                                   │
--      Authenticated (ReliableMsg (Heartbeat msg))    ReliableMsg (Heartbeat msg)
--           │                                   │
-- ┌─────────┼───────────────────────────────────▼──────┐
-- │                                                    │
-- │                  Authenticate                      │
-- │                                                    │
-- └─────────▲───────────────────────────────────┼──────┘
--           │                                   │
--           │                                   │
--       Signed (ReliableMsg (Heartbeat msg))       Signed (ReliableMsg (Heartbeat msg))
--           │                                   │
-- ┌─────────┼───────────────────────────────────▼──────┐
-- │                                                    │
-- │                  Ouroboros                         │
-- │                                                    │
-- └─────────▲───────────────────────────────────┼──────┘
--           │                                   │
--           │           (bytes)                 │
--           │                                   ▼
--
-- @
module Hydra.Node.Network (
  NetworkConfiguration (..),
  withNetwork,
  withFlipHeartbeats,
  configureMessagePersistence,
  acksFile,
) where

import Hydra.Prelude hiding (fromList, replicate)

import Control.Tracer (Tracer)
import Hydra.Crypto (HydraKey, SigningKey)
import Hydra.Logging (traceWith)
import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Network (Host (..), IP, NetworkComponent, NodeId, PortNumber)
import Hydra.Network.Authenticate (Authenticated (Authenticated), Signed, withAuthentication)
import Hydra.Network.Heartbeat (ConnectionMessages, Heartbeat (..), withHeartbeat)
import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork)
import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePersistence, withReliability)
import Hydra.Node (HydraNodeLog (..))
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Party (Party, deriveParty)
import Hydra.Persistence (Persistence (..), createPersistence, createPersistenceIncremental)
import System.FilePath ((</>))

-- | An alias for logging messages output by network component.
-- The type is made complicated because the various subsystems use part of the tracer only.
type LogEntry tx msg = HydraLog tx (WithHost (TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg)))))

-- | Configuration for a `Node` network layer.
data NetworkConfiguration m = NetworkConfiguration
  { forall {k} (m :: k). NetworkConfiguration m -> FilePath
persistenceDir :: FilePath
  -- ^ Persistence directory
  , forall {k} (m :: k). NetworkConfiguration m -> SigningKey HydraKey
signingKey :: SigningKey HydraKey
  -- ^ This node's signing key. This is used to sign messages sent to peers.
  , forall {k} (m :: k). NetworkConfiguration m -> [Party]
otherParties :: [Party]
  -- ^ The list of peers `Party` known to this node.
  , forall {k} (m :: k). NetworkConfiguration m -> IP
host :: IP
  -- ^ IP address to listen on for incoming connections.
  , forall {k} (m :: k). NetworkConfiguration m -> PortNumber
port :: PortNumber
  -- ^ Port to listen on.
  , forall {k} (m :: k). NetworkConfiguration m -> [Host]
peers :: [Host]
  -- ^ Addresses and ports of remote peers.
  , forall {k} (m :: k). NetworkConfiguration m -> NodeId
nodeId :: NodeId
  -- ^ This node's id.
  }

-- | Starts the network layer of a node, passing configured `Network` to its continuation.
withNetwork ::
  (ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
  -- | Tracer to use for logging messages.
  Tracer IO (LogEntry tx msg) ->
  -- | Callback/observer for connectivity changes in peers.
  ConnectionMessages IO ->
  -- | The network configuration
  NetworkConfiguration IO ->
  -- | Produces a `NetworkComponent` that can send `msg` and consumes `Authenticated` @msg@.
  NetworkComponent IO (Authenticated msg) msg ()
withNetwork :: forall msg tx.
(ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
Tracer IO (LogEntry tx msg)
-> ConnectionMessages IO
-> NetworkConfiguration IO
-> NetworkComponent IO (Authenticated msg) msg ()
withNetwork Tracer IO (LogEntry tx msg)
tracer ConnectionMessages IO
connectionMessages NetworkConfiguration IO
configuration NetworkCallback (Authenticated msg) IO
callback Network IO msg -> IO ()
action = do
  let localhost :: Host
localhost = Host{$sel:hostname:Host :: Text
hostname = IP -> Text
forall b a. (Show a, IsString b) => a -> b
show IP
host, PortNumber
port :: PortNumber
$sel:port:Host :: PortNumber
port}
      me :: Party
me = SigningKey HydraKey -> Party
deriveParty SigningKey HydraKey
signingKey
      numberOfParties :: Int
numberOfParties = [Party] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([Party] -> Int) -> [Party] -> Int
forall a b. (a -> b) -> a -> b
$ Party
me Party -> [Party] -> [Party]
forall a. a -> [a] -> [a]
: [Party]
otherParties
  MessagePersistence IO msg
messagePersistence <- Tracer IO (HydraNodeLog tx)
-> FilePath -> Int -> IO (MessagePersistence IO msg)
forall (m :: * -> *) msg tx.
(MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m,
 MonadThread m, MonadThrow (STM m)) =>
Tracer m (HydraNodeLog tx)
-> FilePath -> Int -> m (MessagePersistence m msg)
configureMessagePersistence ((HydraNodeLog tx -> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg) -> Tracer IO (HydraNodeLog tx)
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap HydraNodeLog tx -> LogEntry tx msg
forall tx net. HydraNodeLog tx -> HydraLog tx net
Node Tracer IO (LogEntry tx msg)
tracer) FilePath
persistenceDir Int
numberOfParties

  let reliability :: NetworkComponent
  IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
reliability =
        NetworkComponent
  IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
-> NetworkComponent
     IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
forall (m :: * -> *) msg msg1 a.
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
-> NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
withFlipHeartbeats (NetworkComponent
   IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
 -> NetworkComponent
      IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ())
-> NetworkComponent
     IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
-> NetworkComponent
     IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
forall a b. (a -> b) -> a -> b
$
          Tracer IO ReliabilityLog
-> MessagePersistence IO msg
-> Party
-> [Party]
-> NetworkComponent
     IO
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     ()
-> NetworkComponent
     IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
forall (m :: * -> *) msg a.
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
Tracer m ReliabilityLog
-> MessagePersistence m msg
-> Party
-> [Party]
-> NetworkComponent
     m
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     a
-> NetworkComponent
     m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability ((ReliabilityLog -> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg) -> Tracer IO ReliabilityLog
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ReliabilityLog -> LogEntry tx msg
forall tx net. ReliabilityLog -> HydraLog tx net
Reliability Tracer IO (LogEntry tx msg)
tracer) MessagePersistence IO msg
messagePersistence Party
me [Party]
otherParties (NetworkComponent
   IO
   (Authenticated (ReliableMsg (Heartbeat msg)))
   (ReliableMsg (Heartbeat msg))
   ()
 -> NetworkComponent
      IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ())
-> NetworkComponent
     IO
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     ()
-> NetworkComponent
     IO (Authenticated (Heartbeat msg)) (Heartbeat msg) ()
forall a b. (a -> b) -> a -> b
$
            Tracer IO AuthLog
-> SigningKey HydraKey
-> [Party]
-> NetworkComponent
     IO
     (Signed (ReliableMsg (Heartbeat msg)))
     (Signed (ReliableMsg (Heartbeat msg)))
     ()
-> NetworkComponent
     IO
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     ()
forall msg (m :: * -> *) a.
(SignableRepresentation msg, ToJSON msg) =>
Tracer m AuthLog
-> SigningKey HydraKey
-> [Party]
-> NetworkComponent m (Signed msg) (Signed msg) a
-> NetworkComponent m (Authenticated msg) msg a
withAuthentication ((AuthLog -> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg) -> Tracer IO AuthLog
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap AuthLog -> LogEntry tx msg
forall tx net. AuthLog -> HydraLog tx net
Authentication Tracer IO (LogEntry tx msg)
tracer) SigningKey HydraKey
signingKey [Party]
otherParties (NetworkComponent
   IO
   (Signed (ReliableMsg (Heartbeat msg)))
   (Signed (ReliableMsg (Heartbeat msg)))
   ()
 -> NetworkComponent
      IO
      (Authenticated (ReliableMsg (Heartbeat msg)))
      (ReliableMsg (Heartbeat msg))
      ())
-> NetworkComponent
     IO
     (Signed (ReliableMsg (Heartbeat msg)))
     (Signed (ReliableMsg (Heartbeat msg)))
     ()
-> NetworkComponent
     IO
     (Authenticated (ReliableMsg (Heartbeat msg)))
     (ReliableMsg (Heartbeat msg))
     ()
forall a b. (a -> b) -> a -> b
$
              Tracer
  IO
  (WithHost
     (TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg)))))
-> Host
-> [Host]
-> NetworkComponent
     IO
     (Signed (ReliableMsg (Heartbeat msg)))
     (Signed (ReliableMsg (Heartbeat msg)))
     ()
forall msg.
(ToCBOR msg, FromCBOR msg) =>
Tracer IO (WithHost (TraceOuroborosNetwork msg))
-> Host -> [Host] -> NetworkComponent IO msg msg ()
withOuroborosNetwork ((WithHost
   (TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg))))
 -> LogEntry tx msg)
-> Tracer IO (LogEntry tx msg)
-> Tracer
     IO
     (WithHost
        (TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg)))))
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap WithHost
  (TraceOuroborosNetwork (Signed (ReliableMsg (Heartbeat msg))))
-> LogEntry tx msg
forall tx net. net -> HydraLog tx net
Network Tracer IO (LogEntry tx msg)
tracer) Host
localhost [Host]
peers

  NodeId
-> ConnectionMessages IO
-> NetworkComponent
     IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
-> NetworkComponent IO (Authenticated msg) msg ()
forall (m :: * -> *) msg1 msg a.
(MonadAsync m, MonadDelay m) =>
NodeId
-> ConnectionMessages m
-> NetworkComponent m (Heartbeat msg1) (Heartbeat msg) a
-> NetworkComponent m msg1 msg a
withHeartbeat NodeId
nodeId ConnectionMessages IO
connectionMessages NetworkComponent
  IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
reliability NetworkCallback (Authenticated msg) IO
callback ((Network IO msg -> IO ()) -> IO ())
-> (Network IO msg -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Network IO msg
network ->
    Network IO msg -> IO ()
action Network IO msg
network
 where
  NetworkConfiguration{FilePath
$sel:persistenceDir:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> FilePath
persistenceDir :: FilePath
persistenceDir, SigningKey HydraKey
$sel:signingKey:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> SigningKey HydraKey
signingKey :: SigningKey HydraKey
signingKey, [Party]
$sel:otherParties:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> [Party]
otherParties :: [Party]
otherParties, IP
$sel:host:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> IP
host :: IP
host, PortNumber
$sel:port:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> PortNumber
port :: PortNumber
port, [Host]
$sel:peers:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> [Host]
peers :: [Host]
peers, NodeId
$sel:nodeId:NetworkConfiguration :: forall {k} (m :: k). NetworkConfiguration m -> NodeId
nodeId :: NodeId
nodeId} = NetworkConfiguration IO
configuration

-- | Create `MessagePersistence` handle to be used by `Reliability` network layer.
--
-- This function will `throw` a `ParameterMismatch` exception if:
--
--   * Some state already exists and is loaded,
--   * The number of parties is not the same as the number of acknowledgments saved.
configureMessagePersistence ::
  (MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
  Tracer m (HydraNodeLog tx) ->
  FilePath ->
  Int ->
  m (MessagePersistence m msg)
configureMessagePersistence :: forall (m :: * -> *) msg tx.
(MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m,
 MonadThread m, MonadThrow (STM m)) =>
Tracer m (HydraNodeLog tx)
-> FilePath -> Int -> m (MessagePersistence m msg)
configureMessagePersistence Tracer m (HydraNodeLog tx)
tracer FilePath
persistenceDir Int
numberOfParties = do
  PersistenceIncremental (Heartbeat msg) m
msgPersistence <- FilePath -> m (PersistenceIncremental (Heartbeat msg) m)
forall a (m :: * -> *).
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m,
 MonadThrow (STM m)) =>
FilePath -> m (PersistenceIncremental a m)
createPersistenceIncremental (FilePath -> m (PersistenceIncremental (Heartbeat msg) m))
-> FilePath -> m (PersistenceIncremental (Heartbeat msg) m)
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath
storedMessagesFile FilePath
persistenceDir
  ackPersistence :: Persistence (Vector Int) m
ackPersistence@Persistence{FromJSON (Vector Int) => m (Maybe (Vector Int))
load :: FromJSON (Vector Int) => m (Maybe (Vector Int))
$sel:load:Persistence :: forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load} <- FilePath -> m (Persistence (Vector Int) m)
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
FilePath -> m (Persistence a m)
createPersistence (FilePath -> m (Persistence (Vector Int) m))
-> FilePath -> m (Persistence (Vector Int) m)
forall a b. (a -> b) -> a -> b
$ FilePath -> FilePath
acksFile FilePath
persistenceDir
  Maybe (Vector Int)
mAcks <- m (Maybe (Vector Int))
FromJSON (Vector Int) => m (Maybe (Vector Int))
load
  Persistence (Vector Int) m
ackPersistence' <- case (Vector Int -> Bool) -> Maybe (Vector Int) -> Maybe Bool
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\Vector Int
acks -> Vector Int -> Int
forall a. Vector a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Vector Int
acks Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
numberOfParties) Maybe (Vector Int)
mAcks of
    Just Bool
False -> do
      let paramsMismatch :: [ParamMismatch]
paramsMismatch = [SavedNetworkPartiesInconsistent{Int
numberOfParties :: Int
$sel:numberOfParties:ContestationPeriodMismatch :: Int
numberOfParties}]
      Tracer m (HydraNodeLog tx) -> HydraNodeLog tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (HydraNodeLog tx)
tracer ([ParamMismatch] -> HydraNodeLog tx
forall tx. [ParamMismatch] -> HydraNodeLog tx
Misconfiguration [ParamMismatch]
paramsMismatch)
      ParameterMismatch -> m (Persistence (Vector Int) m)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (ParameterMismatch -> m (Persistence (Vector Int) m))
-> ParameterMismatch -> m (Persistence (Vector Int) m)
forall a b. (a -> b) -> a -> b
$ [ParamMismatch] -> ParameterMismatch
ParameterMismatch [ParamMismatch]
paramsMismatch
    Maybe Bool
_ -> Persistence (Vector Int) m -> m (Persistence (Vector Int) m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Persistence (Vector Int) m
ackPersistence
  MessagePersistence m msg -> m (MessagePersistence m msg)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessagePersistence m msg -> m (MessagePersistence m msg))
-> MessagePersistence m msg -> m (MessagePersistence m msg)
forall a b. (a -> b) -> a -> b
$ Int
-> PersistenceIncremental (Heartbeat msg) m
-> Persistence (Vector Int) m
-> MessagePersistence m msg
forall (m :: * -> *) msg.
(MonadThrow m, FromJSON msg, ToJSON msg) =>
Int
-> PersistenceIncremental (Heartbeat msg) m
-> Persistence (Vector Int) m
-> MessagePersistence m msg
mkMessagePersistence Int
numberOfParties PersistenceIncremental (Heartbeat msg) m
msgPersistence Persistence (Vector Int) m
ackPersistence'

withFlipHeartbeats ::
  NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a ->
  NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
withFlipHeartbeats :: forall (m :: * -> *) msg msg1 a.
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
-> NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
withFlipHeartbeats NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
withBaseNetwork NetworkCallback (Heartbeat (Authenticated msg)) m
callback =
  NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a
withBaseNetwork Authenticated (Heartbeat msg) -> m ()
unwrapHeartbeats
 where
  unwrapHeartbeats :: Authenticated (Heartbeat msg) -> m ()
unwrapHeartbeats = \case
    Authenticated (Data NodeId
nid msg
msg) Party
party -> NetworkCallback (Heartbeat (Authenticated msg)) m
callback NetworkCallback (Heartbeat (Authenticated msg)) m
-> NetworkCallback (Heartbeat (Authenticated msg)) m
forall a b. (a -> b) -> a -> b
$ NodeId -> Authenticated msg -> Heartbeat (Authenticated msg)
forall msg. NodeId -> msg -> Heartbeat msg
Data NodeId
nid (msg -> Party -> Authenticated msg
forall msg. msg -> Party -> Authenticated msg
Authenticated msg
msg Party
party)
    Authenticated (Ping NodeId
nid) Party
_ -> NetworkCallback (Heartbeat (Authenticated msg)) m
callback NetworkCallback (Heartbeat (Authenticated msg)) m
-> NetworkCallback (Heartbeat (Authenticated msg)) m
forall a b. (a -> b) -> a -> b
$ NodeId -> Heartbeat (Authenticated msg)
forall msg. NodeId -> Heartbeat msg
Ping NodeId
nid

-- | Where are the messages stored, relative to given directory.
storedMessagesFile :: FilePath -> FilePath
storedMessagesFile :: FilePath -> FilePath
storedMessagesFile = (FilePath -> FilePath -> FilePath
</> FilePath
"network-messages")

-- | Where is the acknowledgments vector stored, relative to given directory.
acksFile :: FilePath -> FilePath
acksFile :: FilePath -> FilePath
acksFile = (FilePath -> FilePath -> FilePath
</> FilePath
"acks")