{-# LANGUAGE UndecidableInstances #-}

module Hydra.API.Server where

import Hydra.Prelude hiding (TVar, readTVar, seq)

import Cardano.Ledger.Core (PParams)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Exception (IOException)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.HTTPServer (httpApp)
import Hydra.API.Projection (Projection (..), mkProjection)
import Hydra.API.ServerOutput (
  HeadStatus (Idle),
  ServerOutput,
  TimedServerOutput (..),
  projectHeadStatus,
  projectInitializingHeadId,
  projectSnapshotUtxo,
 )
import Hydra.API.WSServer (nextSequenceNumber, wsApp)
import Hydra.Cardano.Api (LedgerEra)
import Hydra.Chain (
  Chain (..),
  IsChainState,
 )
import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Party (Party)
import Hydra.Persistence (PersistenceIncremental (..))
import Network.Wai.Handler.Warp (
  defaultSettings,
  runSettings,
  setBeforeMainLoop,
  setHost,
  setOnException,
  setPort,
 )
import Network.Wai.Handler.WebSockets (websocketsOr)
import Network.WebSockets (
  defaultConnectionOptions,
 )

-- | Handle to provide a means for sending server outputs to clients.
newtype Server tx m = Server
  { forall tx (m :: * -> *). Server tx m -> ServerOutput tx -> m ()
sendOutput :: ServerOutput tx -> m ()
  -- ^ Send some output to all connected clients.
  }

-- | Callback for receiving client inputs.
type ServerCallback tx m = ClientInput tx -> m ()

-- | A type tying both receiving input and sending output into a /Component/.
type ServerComponent tx m a = ServerCallback tx m -> (Server tx m -> m a) -> m a

withAPIServer ::
  forall tx.
  IsChainState tx =>
  IP ->
  PortNumber ->
  Party ->
  PersistenceIncremental (TimedServerOutput tx) IO ->
  Tracer IO APIServerLog ->
  Chain tx IO ->
  PParams LedgerEra ->
  ServerComponent tx IO ()
withAPIServer :: forall tx.
IsChainState tx =>
IP
-> PortNumber
-> Party
-> PersistenceIncremental (TimedServerOutput tx) IO
-> Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> ServerComponent tx IO ()
withAPIServer IP
host PortNumber
port Party
party PersistenceIncremental{FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
loadAll :: FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll, ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
append :: ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append} Tracer IO APIServerLog
tracer Chain tx IO
chain PParams LedgerEra
pparams ServerCallback tx IO
callback Server tx IO -> IO ()
action =
  (IOException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle IOException -> IO ()
onIOException (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TChan (TimedServerOutput tx)
responseChannel <- IO (TChan (TimedServerOutput tx))
forall a. IO (TChan a)
newBroadcastTChanIO
    [TimedServerOutput tx]
timedOutputEvents <- IO [TimedServerOutput tx]
FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
loadAll

    -- Intialize our read model from stored events
    Projection STM (ServerOutput tx) HeadStatus
headStatusP <- HeadStatus
-> [ServerOutput tx]
-> (HeadStatus -> ServerOutput tx -> HeadStatus)
-> IO (Projection (STM IO) (ServerOutput tx) HeadStatus)
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection HeadStatus
Idle (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) HeadStatus -> ServerOutput tx -> HeadStatus
forall tx. HeadStatus -> ServerOutput tx -> HeadStatus
projectHeadStatus
    Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP <- Maybe (UTxOType tx)
-> [ServerOutput tx]
-> (Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx))
-> IO (Projection (STM IO) (ServerOutput tx) (Maybe (UTxOType tx)))
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection Maybe (UTxOType tx)
forall a. Maybe a
Nothing (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx)
forall tx.
Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx)
projectSnapshotUtxo
    Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP <- Maybe HeadId
-> [ServerOutput tx]
-> (Maybe HeadId -> ServerOutput tx -> Maybe HeadId)
-> IO (Projection (STM IO) (ServerOutput tx) (Maybe HeadId))
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection Maybe HeadId
forall a. Maybe a
Nothing (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) Maybe HeadId -> ServerOutput tx -> Maybe HeadId
forall tx. Maybe HeadId -> ServerOutput tx -> Maybe HeadId
projectInitializingHeadId

    -- NOTE: we need to reverse the list because we store history in a reversed
    -- list in memory but in order on disk
    TVar [TimedServerOutput tx]
history <- [TimedServerOutput tx] -> IO (TVar [TimedServerOutput tx])
forall a. a -> IO (TVar a)
newTVarIO ([TimedServerOutput tx] -> [TimedServerOutput tx]
forall a. [a] -> [a]
reverse [TimedServerOutput tx]
timedOutputEvents)
    (IO ()
notifyServerRunning, IO ()
waitForServerRunning) <- IO (IO (), IO ())
setupServerNotification

    let serverSettings :: Settings
serverSettings =
          Settings
defaultSettings
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& HostPreference -> Settings -> Settings
setHost (String -> HostPreference
forall a. IsString a => String -> a
fromString (String -> HostPreference) -> String -> HostPreference
forall a b. (a -> b) -> a -> b
$ IP -> String
forall b a. (Show a, IsString b) => a -> b
show IP
host)
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& Port -> Settings -> Settings
setPort (PortNumber -> Port
forall a b. (Integral a, Num b) => a -> b
fromIntegral PortNumber
port)
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& (Maybe Request -> SomeException -> IO ()) -> Settings -> Settings
setOnException (\Maybe Request
_ SomeException
e -> Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (APIServerLog -> IO ()) -> APIServerLog -> IO ()
forall a b. (a -> b) -> a -> b
$ APIConnectionError{$sel:reason:APIServerStarted :: String
reason = SomeException -> String
forall b a. (Show a, IsString b) => a -> b
show SomeException
e})
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& IO () -> Settings -> Settings
setBeforeMainLoop IO ()
notifyServerRunning
    IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_
      ( do
          Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (PortNumber -> APIServerLog
APIServerStarted PortNumber
port)
          Settings -> Application -> IO ()
runSettings Settings
serverSettings (Application -> IO ()) -> Application -> IO ()
forall a b. (a -> b) -> a -> b
$
            ConnectionOptions -> ServerApp -> Application -> Application
websocketsOr
              ConnectionOptions
defaultConnectionOptions
              (Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> ServerCallback tx IO
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> ServerApp
forall tx.
IsChainState tx =>
Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> (ClientInput tx -> IO ())
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> ServerApp
wsApp Party
party Tracer IO APIServerLog
tracer TVar [TimedServerOutput tx]
history ServerCallback tx IO
callback Projection STM (ServerOutput tx) HeadStatus
headStatusP Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP TChan (TimedServerOutput tx)
responseChannel)
              (Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> STM IO (Maybe HeadId)
-> Application
forall tx.
Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> STM IO (Maybe HeadId)
-> Application
httpApp Tracer IO APIServerLog
tracer Chain tx IO
chain PParams LedgerEra
pparams (Projection STM (ServerOutput tx) (Maybe HeadId)
-> STM (Maybe HeadId)
forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP))
      )
      ( do
          IO ()
waitForServerRunning
          Server tx IO -> IO ()
action (Server tx IO -> IO ()) -> Server tx IO -> IO ()
forall a b. (a -> b) -> a -> b
$
            Server
              { $sel:sendOutput:Server :: ServerOutput tx -> IO ()
sendOutput = \ServerOutput tx
output -> do
                  TimedServerOutput tx
timedOutput <- TVar [TimedServerOutput tx]
-> ServerOutput tx -> IO (TimedServerOutput tx)
appendToHistory TVar [TimedServerOutput tx]
history ServerOutput tx
output
                  STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    Projection STM (ServerOutput tx) HeadStatus
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) HeadStatus
headStatusP ServerOutput tx
output
                    Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP ServerOutput tx
output
                    Projection STM (ServerOutput tx) (Maybe HeadId)
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP ServerOutput tx
output
                    TChan (TimedServerOutput tx) -> TimedServerOutput tx -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (TimedServerOutput tx)
responseChannel TimedServerOutput tx
timedOutput
              }
      )
 where
  appendToHistory :: TVar [TimedServerOutput tx]
-> ServerOutput tx -> IO (TimedServerOutput tx)
appendToHistory TVar [TimedServerOutput tx]
history ServerOutput tx
output = do
    UTCTime
time <- IO UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
    TimedServerOutput tx
timedOutput <- STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx))
-> STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a b. (a -> b) -> a -> b
$ do
      Natural
seq <- TVar [TimedServerOutput tx] -> STM Natural
forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
history
      let timedOutput :: TimedServerOutput tx
timedOutput = TimedServerOutput{ServerOutput tx
$sel:output:TimedServerOutput :: ServerOutput tx
output :: ServerOutput tx
output, UTCTime
time :: UTCTime
$sel:time:TimedServerOutput :: UTCTime
time, Natural
seq :: Natural
$sel:seq:TimedServerOutput :: Natural
seq}
      TVar [TimedServerOutput tx]
-> ([TimedServerOutput tx] -> [TimedServerOutput tx]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [TimedServerOutput tx]
history (TimedServerOutput tx
timedOutput :)
      TimedServerOutput tx -> STM (TimedServerOutput tx)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimedServerOutput tx
timedOutput
    ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
TimedServerOutput tx -> IO ()
append TimedServerOutput tx
timedOutput
    TimedServerOutput tx -> IO (TimedServerOutput tx)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimedServerOutput tx
timedOutput

  onIOException :: IOException -> IO ()
onIOException IOException
ioException =
    RunServerException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
      RunServerException
        { IOException
ioException :: IOException
$sel:ioException:RunServerException :: IOException
ioException
        , IP
host :: IP
$sel:host:RunServerException :: IP
host
        , PortNumber
port :: PortNumber
$sel:port:RunServerException :: PortNumber
port
        }

-- | An 'IOException' with more 'IP' and 'PortNumber' added as context.
data RunServerException = RunServerException
  { RunServerException -> IOException
ioException :: IOException
  , RunServerException -> IP
host :: IP
  , RunServerException -> PortNumber
port :: PortNumber
  }
  deriving stock (Port -> RunServerException -> ShowS
[RunServerException] -> ShowS
RunServerException -> String
(Port -> RunServerException -> ShowS)
-> (RunServerException -> String)
-> ([RunServerException] -> ShowS)
-> Show RunServerException
forall a.
(Port -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> RunServerException -> ShowS
showsPrec :: Port -> RunServerException -> ShowS
$cshow :: RunServerException -> String
show :: RunServerException -> String
$cshowList :: [RunServerException] -> ShowS
showList :: [RunServerException] -> ShowS
Show)

instance Exception RunServerException

type NotifyServerRunning = IO ()

type WaitForServer = IO ()

-- | Setup notification and waiter to ensure that something only runs after the
-- server is actually listening.
setupServerNotification :: IO (NotifyServerRunning, WaitForServer)
setupServerNotification :: IO (IO (), IO ())
setupServerNotification = do
  MVar ()
mv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  (IO (), IO ()) -> IO (IO (), IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
mv (), MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
mv)