{-# LANGUAGE UndecidableInstances #-}

module Hydra.API.Server where

import Hydra.Prelude hiding (TVar, readTVar, seq)

import Cardano.Ledger.Core (PParams)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Exception (IOException)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.HTTPServer (httpApp)
import Hydra.API.Projection (Projection (..), mkProjection)
import Hydra.API.ServerOutput (
  HeadStatus (Idle),
  ServerOutput,
  TimedServerOutput (..),
  projectHeadStatus,
  projectInitializingHeadId,
  projectSnapshotUtxo,
 )
import Hydra.API.WSServer (nextSequenceNumber, wsApp)
import Hydra.Cardano.Api (LedgerEra)
import Hydra.Chain (Chain (..))
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Tx (Party)
import Network.HTTP.Types (status500)
import Network.Wai (responseLBS)
import Network.Wai.Handler.Warp (
  defaultSettings,
  runSettings,
  setBeforeMainLoop,
  setHost,
  setOnException,
  setOnExceptionResponse,
  setPort,
 )
import Network.Wai.Handler.WarpTLS (runTLS, tlsSettings)
import Network.Wai.Handler.WebSockets (websocketsOr)
import Network.Wai.Middleware.Cors (simpleCors)
import Network.WebSockets (
  defaultConnectionOptions,
 )

-- | Handle to provide a means for sending server outputs to clients.
newtype Server tx m = Server
  { forall tx (m :: * -> *). Server tx m -> ServerOutput tx -> m ()
sendOutput :: ServerOutput tx -> m ()
  -- ^ Send some output to all connected clients.
  }

-- | Callback for receiving client inputs.
type ServerCallback tx m = ClientInput tx -> m ()

-- | A type tying both receiving input and sending output into a /Component/.
type ServerComponent tx m a = ServerCallback tx m -> (Server tx m -> m a) -> m a

data APIServerConfig = APIServerConfig
  { APIServerConfig -> IP
host :: IP
  , APIServerConfig -> PortNumber
port :: PortNumber
  , APIServerConfig -> Maybe FilePath
tlsCertPath :: Maybe FilePath
  , APIServerConfig -> Maybe FilePath
tlsKeyPath :: Maybe FilePath
  }

withAPIServer ::
  forall tx.
  IsChainState tx =>
  APIServerConfig ->
  Party ->
  PersistenceIncremental (TimedServerOutput tx) IO ->
  Tracer IO APIServerLog ->
  Chain tx IO ->
  PParams LedgerEra ->
  ServerComponent tx IO ()
withAPIServer :: forall tx.
IsChainState tx =>
APIServerConfig
-> Party
-> PersistenceIncremental (TimedServerOutput tx) IO
-> Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> ServerComponent tx IO ()
withAPIServer APIServerConfig
config Party
party PersistenceIncremental (TimedServerOutput tx) IO
persistence Tracer IO APIServerLog
tracer Chain tx IO
chain PParams LedgerEra
pparams ServerCallback tx IO
callback Server tx IO -> IO ()
action =
  (IOException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle IOException -> IO ()
onIOException (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TChan (TimedServerOutput tx)
responseChannel <- IO (TChan (TimedServerOutput tx))
forall a. IO (TChan a)
newBroadcastTChanIO
    [TimedServerOutput tx]
timedOutputEvents <- IO [TimedServerOutput tx]
FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
loadAll

    -- Intialize our read model from stored events
    Projection STM (ServerOutput tx) HeadStatus
headStatusP <- HeadStatus
-> [ServerOutput tx]
-> (HeadStatus -> ServerOutput tx -> HeadStatus)
-> IO (Projection (STM IO) (ServerOutput tx) HeadStatus)
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection HeadStatus
Idle (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) HeadStatus -> ServerOutput tx -> HeadStatus
forall tx. HeadStatus -> ServerOutput tx -> HeadStatus
projectHeadStatus
    Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP <- Maybe (UTxOType tx)
-> [ServerOutput tx]
-> (Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx))
-> IO (Projection (STM IO) (ServerOutput tx) (Maybe (UTxOType tx)))
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection Maybe (UTxOType tx)
forall a. Maybe a
Nothing (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx)
forall tx.
Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx)
projectSnapshotUtxo
    Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP <- Maybe HeadId
-> [ServerOutput tx]
-> (Maybe HeadId -> ServerOutput tx -> Maybe HeadId)
-> IO (Projection (STM IO) (ServerOutput tx) (Maybe HeadId))
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection Maybe HeadId
forall a. Maybe a
Nothing (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) Maybe HeadId -> ServerOutput tx -> Maybe HeadId
forall tx. Maybe HeadId -> ServerOutput tx -> Maybe HeadId
projectInitializingHeadId

    -- NOTE: we need to reverse the list because we store history in a reversed
    -- list in memory but in order on disk
    TVar [TimedServerOutput tx]
history <- [TimedServerOutput tx] -> IO (TVar [TimedServerOutput tx])
forall a. a -> IO (TVar a)
newTVarIO ([TimedServerOutput tx] -> [TimedServerOutput tx]
forall a. [a] -> [a]
reverse [TimedServerOutput tx]
timedOutputEvents)
    (IO ()
notifyServerRunning, IO ()
waitForServerRunning) <- IO (IO (), IO ())
setupServerNotification

    let serverSettings :: Settings
serverSettings =
          Settings
defaultSettings
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& HostPreference -> Settings -> Settings
setHost (FilePath -> HostPreference
forall a. IsString a => FilePath -> a
fromString (FilePath -> HostPreference) -> FilePath -> HostPreference
forall a b. (a -> b) -> a -> b
$ IP -> FilePath
forall b a. (Show a, IsString b) => a -> b
show IP
host)
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& Port -> Settings -> Settings
setPort (PortNumber -> Port
forall a b. (Integral a, Num b) => a -> b
fromIntegral PortNumber
port)
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& (Maybe Request -> SomeException -> IO ()) -> Settings -> Settings
setOnException (\Maybe Request
_ SomeException
e -> Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (APIServerLog -> IO ()) -> APIServerLog -> IO ()
forall a b. (a -> b) -> a -> b
$ APIConnectionError{$sel:reason:APIServerStarted :: FilePath
reason = SomeException -> FilePath
forall b a. (Show a, IsString b) => a -> b
show SomeException
e})
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& (SomeException -> Response) -> Settings -> Settings
setOnExceptionResponse (Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status500 [] (ByteString -> Response)
-> (SomeException -> ByteString) -> SomeException -> Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> ByteString
forall b a. (Show a, IsString b) => a -> b
show)
            Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& IO () -> Settings -> Settings
setBeforeMainLoop IO ()
notifyServerRunning
    IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_
      ( do
          Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (PortNumber -> APIServerLog
APIServerStarted PortNumber
port)
          Settings -> Application -> IO ()
startServer Settings
serverSettings
            (Application -> IO ())
-> (Application -> Application) -> Application -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Application -> Application
simpleCors
            (Application -> IO ()) -> Application -> IO ()
forall a b. (a -> b) -> a -> b
$ ConnectionOptions -> ServerApp -> Application -> Application
websocketsOr
              ConnectionOptions
defaultConnectionOptions
              (Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> ServerCallback tx IO
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe HeadId)
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> ServerApp
forall tx.
IsChainState tx =>
Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> (ClientInput tx -> IO ())
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe HeadId)
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> ServerApp
wsApp Party
party Tracer IO APIServerLog
tracer TVar [TimedServerOutput tx]
history ServerCallback tx IO
callback Projection STM (ServerOutput tx) HeadStatus
headStatusP Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP TChan (TimedServerOutput tx)
responseChannel)
              (Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> IO (Maybe HeadId)
-> IO (Maybe (UTxOType tx))
-> ServerCallback tx IO
-> Application
forall tx.
IsChainState tx =>
Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> IO (Maybe HeadId)
-> IO (Maybe (UTxOType tx))
-> (ClientInput tx -> IO ())
-> Application
httpApp Tracer IO APIServerLog
tracer Chain tx IO
chain PParams LedgerEra
pparams (STM IO (Maybe HeadId) -> IO (Maybe HeadId)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (Maybe HeadId) -> IO (Maybe HeadId))
-> STM IO (Maybe HeadId) -> IO (Maybe HeadId)
forall a b. (a -> b) -> a -> b
$ Projection STM (ServerOutput tx) (Maybe HeadId)
-> STM (Maybe HeadId)
forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP) (STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx))
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx)))
-> STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx))
forall a b. (a -> b) -> a -> b
$ Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> STM (Maybe (UTxOType tx))
forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP) ServerCallback tx IO
callback)
      )
      ( do
          IO ()
waitForServerRunning
          Server tx IO -> IO ()
action (Server tx IO -> IO ()) -> Server tx IO -> IO ()
forall a b. (a -> b) -> a -> b
$
            Server
              { $sel:sendOutput:Server :: ServerOutput tx -> IO ()
sendOutput = \ServerOutput tx
output -> do
                  TimedServerOutput tx
timedOutput <- TVar [TimedServerOutput tx]
-> ServerOutput tx -> IO (TimedServerOutput tx)
appendToHistory TVar [TimedServerOutput tx]
history ServerOutput tx
output
                  STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    Projection STM (ServerOutput tx) HeadStatus
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) HeadStatus
headStatusP ServerOutput tx
output
                    Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP ServerOutput tx
output
                    Projection STM (ServerOutput tx) (Maybe HeadId)
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP ServerOutput tx
output
                    TChan (TimedServerOutput tx) -> TimedServerOutput tx -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (TimedServerOutput tx)
responseChannel TimedServerOutput tx
timedOutput
              }
      )
 where
  APIServerConfig{IP
$sel:host:APIServerConfig :: APIServerConfig -> IP
host :: IP
host, PortNumber
$sel:port:APIServerConfig :: APIServerConfig -> PortNumber
port :: PortNumber
port, Maybe FilePath
$sel:tlsCertPath:APIServerConfig :: APIServerConfig -> Maybe FilePath
tlsCertPath :: Maybe FilePath
tlsCertPath, Maybe FilePath
$sel:tlsKeyPath:APIServerConfig :: APIServerConfig -> Maybe FilePath
tlsKeyPath :: Maybe FilePath
tlsKeyPath} = APIServerConfig
config

  PersistenceIncremental{FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
loadAll :: FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll, ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
append :: ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append} = PersistenceIncremental (TimedServerOutput tx) IO
persistence

  startServer :: Settings -> Application -> IO ()
startServer Settings
settings Application
app =
    case (Maybe FilePath
tlsCertPath, Maybe FilePath
tlsKeyPath) of
      (Just FilePath
cert, Just FilePath
key) ->
        TLSSettings -> Settings -> Application -> IO ()
runTLS (FilePath -> FilePath -> TLSSettings
tlsSettings FilePath
cert FilePath
key) Settings
settings Application
app
      -- TODO: better error handling
      (Just FilePath
_, Maybe FilePath
Nothing) ->
        FilePath -> IO ()
forall (m :: * -> *) a. MonadIO m => FilePath -> m a
die FilePath
"TLS certificate provided without key"
      (Maybe FilePath
Nothing, Just FilePath
_) ->
        FilePath -> IO ()
forall (m :: * -> *) a. MonadIO m => FilePath -> m a
die FilePath
"TLS key provided without certificate"
      (Maybe FilePath, Maybe FilePath)
_ ->
        Settings -> Application -> IO ()
runSettings Settings
settings Application
app

  appendToHistory :: TVar [TimedServerOutput tx]
-> ServerOutput tx -> IO (TimedServerOutput tx)
appendToHistory TVar [TimedServerOutput tx]
history ServerOutput tx
output = do
    UTCTime
time <- IO UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
    TimedServerOutput tx
timedOutput <- STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx))
-> STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a b. (a -> b) -> a -> b
$ do
      Natural
seq <- TVar [TimedServerOutput tx] -> STM Natural
forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
history
      let timedOutput :: TimedServerOutput tx
timedOutput = TimedServerOutput{ServerOutput tx
$sel:output:TimedServerOutput :: ServerOutput tx
output :: ServerOutput tx
output, UTCTime
time :: UTCTime
$sel:time:TimedServerOutput :: UTCTime
time, Natural
seq :: Natural
$sel:seq:TimedServerOutput :: Natural
seq}
      TVar [TimedServerOutput tx]
-> ([TimedServerOutput tx] -> [TimedServerOutput tx]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [TimedServerOutput tx]
history (TimedServerOutput tx
timedOutput :)
      TimedServerOutput tx -> STM (TimedServerOutput tx)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimedServerOutput tx
timedOutput
    ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
TimedServerOutput tx -> IO ()
append TimedServerOutput tx
timedOutput
    TimedServerOutput tx -> IO (TimedServerOutput tx)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimedServerOutput tx
timedOutput

  onIOException :: IOException -> IO ()
onIOException IOException
ioException =
    RunServerException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
      RunServerException
        { IOException
ioException :: IOException
$sel:ioException:RunServerException :: IOException
ioException
        , IP
host :: IP
$sel:host:RunServerException :: IP
host
        , PortNumber
port :: PortNumber
$sel:port:RunServerException :: PortNumber
port
        }

-- | An 'IOException' with more 'IP' and 'PortNumber' added as context.
data RunServerException = RunServerException
  { RunServerException -> IOException
ioException :: IOException
  , RunServerException -> IP
host :: IP
  , RunServerException -> PortNumber
port :: PortNumber
  }
  deriving stock (Port -> RunServerException -> ShowS
[RunServerException] -> ShowS
RunServerException -> FilePath
(Port -> RunServerException -> ShowS)
-> (RunServerException -> FilePath)
-> ([RunServerException] -> ShowS)
-> Show RunServerException
forall a.
(Port -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> RunServerException -> ShowS
showsPrec :: Port -> RunServerException -> ShowS
$cshow :: RunServerException -> FilePath
show :: RunServerException -> FilePath
$cshowList :: [RunServerException] -> ShowS
showList :: [RunServerException] -> ShowS
Show)

instance Exception RunServerException

type NotifyServerRunning = IO ()

type WaitForServer = IO ()

-- | Setup notification and waiter to ensure that something only runs after the
-- server is actually listening.
setupServerNotification :: IO (NotifyServerRunning, WaitForServer)
setupServerNotification :: IO (IO (), IO ())
setupServerNotification = do
  MVar ()
mv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  (IO (), IO ()) -> IO (IO (), IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
mv (), MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
mv)