{-# LANGUAGE UndecidableInstances #-}
module Hydra.API.Server where
import Hydra.Prelude hiding (TVar, readTVar, seq)
import Cardano.Ledger.Core (PParams)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Exception (IOException)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.HTTPServer (httpApp)
import Hydra.API.Projection (Projection (..), mkProjection)
import Hydra.API.ServerOutput (
HeadStatus (Idle),
ServerOutput,
TimedServerOutput (..),
projectHeadStatus,
projectInitializingHeadId,
projectSnapshotUtxo,
)
import Hydra.API.WSServer (nextSequenceNumber, wsApp)
import Hydra.Cardano.Api (LedgerEra)
import Hydra.Chain (Chain (..))
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Tx (Party)
import Network.HTTP.Types (status500)
import Network.Wai (responseLBS)
import Network.Wai.Handler.Warp (
defaultSettings,
runSettings,
setBeforeMainLoop,
setHost,
setOnException,
setOnExceptionResponse,
setPort,
)
import Network.Wai.Handler.WarpTLS (runTLS, tlsSettings)
import Network.Wai.Handler.WebSockets (websocketsOr)
import Network.Wai.Middleware.Cors (simpleCors)
import Network.WebSockets (
defaultConnectionOptions,
)
newtype Server tx m = Server
{ forall tx (m :: * -> *). Server tx m -> ServerOutput tx -> m ()
sendOutput :: ServerOutput tx -> m ()
}
type ServerCallback tx m = ClientInput tx -> m ()
type ServerComponent tx m a = ServerCallback tx m -> (Server tx m -> m a) -> m a
data APIServerConfig = APIServerConfig
{ APIServerConfig -> IP
host :: IP
, APIServerConfig -> PortNumber
port :: PortNumber
, APIServerConfig -> Maybe FilePath
tlsCertPath :: Maybe FilePath
, APIServerConfig -> Maybe FilePath
tlsKeyPath :: Maybe FilePath
}
withAPIServer ::
forall tx.
IsChainState tx =>
APIServerConfig ->
Party ->
PersistenceIncremental (TimedServerOutput tx) IO ->
Tracer IO APIServerLog ->
Chain tx IO ->
PParams LedgerEra ->
ServerComponent tx IO ()
withAPIServer :: forall tx.
IsChainState tx =>
APIServerConfig
-> Party
-> PersistenceIncremental (TimedServerOutput tx) IO
-> Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> ServerComponent tx IO ()
withAPIServer APIServerConfig
config Party
party PersistenceIncremental (TimedServerOutput tx) IO
persistence Tracer IO APIServerLog
tracer Chain tx IO
chain PParams LedgerEra
pparams ServerCallback tx IO
callback Server tx IO -> IO ()
action =
(IOException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle IOException -> IO ()
onIOException (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TChan (TimedServerOutput tx)
responseChannel <- IO (TChan (TimedServerOutput tx))
forall a. IO (TChan a)
newBroadcastTChanIO
[TimedServerOutput tx]
timedOutputEvents <- IO [TimedServerOutput tx]
FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
loadAll
Projection STM (ServerOutput tx) HeadStatus
headStatusP <- HeadStatus
-> [ServerOutput tx]
-> (HeadStatus -> ServerOutput tx -> HeadStatus)
-> IO (Projection (STM IO) (ServerOutput tx) HeadStatus)
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection HeadStatus
Idle (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) HeadStatus -> ServerOutput tx -> HeadStatus
forall tx. HeadStatus -> ServerOutput tx -> HeadStatus
projectHeadStatus
Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP <- Maybe (UTxOType tx)
-> [ServerOutput tx]
-> (Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx))
-> IO (Projection (STM IO) (ServerOutput tx) (Maybe (UTxOType tx)))
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection Maybe (UTxOType tx)
forall a. Maybe a
Nothing (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx)
forall tx.
Maybe (UTxOType tx) -> ServerOutput tx -> Maybe (UTxOType tx)
projectSnapshotUtxo
Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP <- Maybe HeadId
-> [ServerOutput tx]
-> (Maybe HeadId -> ServerOutput tx -> Maybe HeadId)
-> IO (Projection (STM IO) (ServerOutput tx) (Maybe HeadId))
forall (m :: * -> *) model event.
MonadSTM m =>
model
-> [event]
-> (model -> event -> model)
-> m (Projection (STM m) event model)
mkProjection Maybe HeadId
forall a. Maybe a
Nothing (TimedServerOutput tx -> ServerOutput tx
forall tx. TimedServerOutput tx -> ServerOutput tx
output (TimedServerOutput tx -> ServerOutput tx)
-> [TimedServerOutput tx] -> [ServerOutput tx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TimedServerOutput tx]
timedOutputEvents) Maybe HeadId -> ServerOutput tx -> Maybe HeadId
forall tx. Maybe HeadId -> ServerOutput tx -> Maybe HeadId
projectInitializingHeadId
TVar [TimedServerOutput tx]
history <- [TimedServerOutput tx] -> IO (TVar [TimedServerOutput tx])
forall a. a -> IO (TVar a)
newTVarIO ([TimedServerOutput tx] -> [TimedServerOutput tx]
forall a. [a] -> [a]
reverse [TimedServerOutput tx]
timedOutputEvents)
(IO ()
notifyServerRunning, IO ()
waitForServerRunning) <- IO (IO (), IO ())
setupServerNotification
let serverSettings :: Settings
serverSettings =
Settings
defaultSettings
Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& HostPreference -> Settings -> Settings
setHost (FilePath -> HostPreference
forall a. IsString a => FilePath -> a
fromString (FilePath -> HostPreference) -> FilePath -> HostPreference
forall a b. (a -> b) -> a -> b
$ IP -> FilePath
forall b a. (Show a, IsString b) => a -> b
show IP
host)
Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& Port -> Settings -> Settings
setPort (PortNumber -> Port
forall a b. (Integral a, Num b) => a -> b
fromIntegral PortNumber
port)
Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& (Maybe Request -> SomeException -> IO ()) -> Settings -> Settings
setOnException (\Maybe Request
_ SomeException
e -> Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (APIServerLog -> IO ()) -> APIServerLog -> IO ()
forall a b. (a -> b) -> a -> b
$ APIConnectionError{$sel:reason:APIServerStarted :: FilePath
reason = SomeException -> FilePath
forall b a. (Show a, IsString b) => a -> b
show SomeException
e})
Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& (SomeException -> Response) -> Settings -> Settings
setOnExceptionResponse (Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status500 [] (ByteString -> Response)
-> (SomeException -> ByteString) -> SomeException -> Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> ByteString
forall b a. (Show a, IsString b) => a -> b
show)
Settings -> (Settings -> Settings) -> Settings
forall a b. a -> (a -> b) -> b
& IO () -> Settings -> Settings
setBeforeMainLoop IO ()
notifyServerRunning
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_
( do
Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (PortNumber -> APIServerLog
APIServerStarted PortNumber
port)
Settings -> Application -> IO ()
startServer Settings
serverSettings
(Application -> IO ())
-> (Application -> Application) -> Application -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Application -> Application
simpleCors
(Application -> IO ()) -> Application -> IO ()
forall a b. (a -> b) -> a -> b
$ ConnectionOptions -> ServerApp -> Application -> Application
websocketsOr
ConnectionOptions
defaultConnectionOptions
(Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> ServerCallback tx IO
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe HeadId)
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> ServerApp
forall tx.
IsChainState tx =>
Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> (ClientInput tx -> IO ())
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe HeadId)
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> ServerApp
wsApp Party
party Tracer IO APIServerLog
tracer TVar [TimedServerOutput tx]
history ServerCallback tx IO
callback Projection STM (ServerOutput tx) HeadStatus
headStatusP Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP TChan (TimedServerOutput tx)
responseChannel)
(Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> IO (Maybe HeadId)
-> IO (Maybe (UTxOType tx))
-> ServerCallback tx IO
-> Application
forall tx.
IsChainState tx =>
Tracer IO APIServerLog
-> Chain tx IO
-> PParams LedgerEra
-> IO (Maybe HeadId)
-> IO (Maybe (UTxOType tx))
-> (ClientInput tx -> IO ())
-> Application
httpApp Tracer IO APIServerLog
tracer Chain tx IO
chain PParams LedgerEra
pparams (STM IO (Maybe HeadId) -> IO (Maybe HeadId)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (Maybe HeadId) -> IO (Maybe HeadId))
-> STM IO (Maybe HeadId) -> IO (Maybe HeadId)
forall a b. (a -> b) -> a -> b
$ Projection STM (ServerOutput tx) (Maybe HeadId)
-> STM (Maybe HeadId)
forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP) (STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx))
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx)))
-> STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx))
forall a b. (a -> b) -> a -> b
$ Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> STM (Maybe (UTxOType tx))
forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP) ServerCallback tx IO
callback)
)
( do
IO ()
waitForServerRunning
Server tx IO -> IO ()
action (Server tx IO -> IO ()) -> Server tx IO -> IO ()
forall a b. (a -> b) -> a -> b
$
Server
{ $sel:sendOutput:Server :: ServerOutput tx -> IO ()
sendOutput = \ServerOutput tx
output -> do
TimedServerOutput tx
timedOutput <- TVar [TimedServerOutput tx]
-> ServerOutput tx -> IO (TimedServerOutput tx)
appendToHistory TVar [TimedServerOutput tx]
history ServerOutput tx
output
STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Projection STM (ServerOutput tx) HeadStatus
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) HeadStatus
headStatusP ServerOutput tx
output
Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP ServerOutput tx
output
Projection STM (ServerOutput tx) (Maybe HeadId)
-> ServerOutput tx -> STM ()
forall (stm :: * -> *) event model.
Projection stm event model -> event -> stm ()
update Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP ServerOutput tx
output
TChan (TimedServerOutput tx) -> TimedServerOutput tx -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (TimedServerOutput tx)
responseChannel TimedServerOutput tx
timedOutput
}
)
where
APIServerConfig{IP
$sel:host:APIServerConfig :: APIServerConfig -> IP
host :: IP
host, PortNumber
$sel:port:APIServerConfig :: APIServerConfig -> PortNumber
port :: PortNumber
port, Maybe FilePath
$sel:tlsCertPath:APIServerConfig :: APIServerConfig -> Maybe FilePath
tlsCertPath :: Maybe FilePath
tlsCertPath, Maybe FilePath
$sel:tlsKeyPath:APIServerConfig :: APIServerConfig -> Maybe FilePath
tlsKeyPath :: Maybe FilePath
tlsKeyPath} = APIServerConfig
config
PersistenceIncremental{FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
loadAll :: FromJSON (TimedServerOutput tx) => IO [TimedServerOutput tx]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll, ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
append :: ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append} = PersistenceIncremental (TimedServerOutput tx) IO
persistence
startServer :: Settings -> Application -> IO ()
startServer Settings
settings Application
app =
case (Maybe FilePath
tlsCertPath, Maybe FilePath
tlsKeyPath) of
(Just FilePath
cert, Just FilePath
key) ->
TLSSettings -> Settings -> Application -> IO ()
runTLS (FilePath -> FilePath -> TLSSettings
tlsSettings FilePath
cert FilePath
key) Settings
settings Application
app
(Just FilePath
_, Maybe FilePath
Nothing) ->
FilePath -> IO ()
forall (m :: * -> *) a. MonadIO m => FilePath -> m a
die FilePath
"TLS certificate provided without key"
(Maybe FilePath
Nothing, Just FilePath
_) ->
FilePath -> IO ()
forall (m :: * -> *) a. MonadIO m => FilePath -> m a
die FilePath
"TLS key provided without certificate"
(Maybe FilePath, Maybe FilePath)
_ ->
Settings -> Application -> IO ()
runSettings Settings
settings Application
app
appendToHistory :: TVar [TimedServerOutput tx]
-> ServerOutput tx -> IO (TimedServerOutput tx)
appendToHistory TVar [TimedServerOutput tx]
history ServerOutput tx
output = do
UTCTime
time <- IO UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
TimedServerOutput tx
timedOutput <- STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx))
-> STM IO (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a b. (a -> b) -> a -> b
$ do
Natural
seq <- TVar [TimedServerOutput tx] -> STM Natural
forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
history
let timedOutput :: TimedServerOutput tx
timedOutput = TimedServerOutput{ServerOutput tx
$sel:output:TimedServerOutput :: ServerOutput tx
output :: ServerOutput tx
output, UTCTime
time :: UTCTime
$sel:time:TimedServerOutput :: UTCTime
time, Natural
seq :: Natural
$sel:seq:TimedServerOutput :: Natural
seq}
TVar [TimedServerOutput tx]
-> ([TimedServerOutput tx] -> [TimedServerOutput tx]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [TimedServerOutput tx]
history (TimedServerOutput tx
timedOutput :)
TimedServerOutput tx -> STM (TimedServerOutput tx)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimedServerOutput tx
timedOutput
ToJSON (TimedServerOutput tx) => TimedServerOutput tx -> IO ()
TimedServerOutput tx -> IO ()
append TimedServerOutput tx
timedOutput
TimedServerOutput tx -> IO (TimedServerOutput tx)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimedServerOutput tx
timedOutput
onIOException :: IOException -> IO ()
onIOException IOException
ioException =
RunServerException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
RunServerException
{ IOException
ioException :: IOException
$sel:ioException:RunServerException :: IOException
ioException
, IP
host :: IP
$sel:host:RunServerException :: IP
host
, PortNumber
port :: PortNumber
$sel:port:RunServerException :: PortNumber
port
}
data RunServerException = RunServerException
{ RunServerException -> IOException
ioException :: IOException
, RunServerException -> IP
host :: IP
, RunServerException -> PortNumber
port :: PortNumber
}
deriving stock (Port -> RunServerException -> ShowS
[RunServerException] -> ShowS
RunServerException -> FilePath
(Port -> RunServerException -> ShowS)
-> (RunServerException -> FilePath)
-> ([RunServerException] -> ShowS)
-> Show RunServerException
forall a.
(Port -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> RunServerException -> ShowS
showsPrec :: Port -> RunServerException -> ShowS
$cshow :: RunServerException -> FilePath
show :: RunServerException -> FilePath
$cshowList :: [RunServerException] -> ShowS
showList :: [RunServerException] -> ShowS
Show)
instance Exception RunServerException
type NotifyServerRunning = IO ()
type WaitForServer = IO ()
setupServerNotification :: IO (NotifyServerRunning, WaitForServer)
setupServerNotification :: IO (IO (), IO ())
setupServerNotification = do
MVar ()
mv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
(IO (), IO ()) -> IO (IO (), IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
mv (), MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
mv)