{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE UndecidableInstances #-}
module Hydra.API.WSServer where
import Hydra.Prelude hiding (TVar, readTVar, seq)
import Control.Concurrent.STM (TChan, dupTChan, readTChan)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TVar (TVar, readTVar)
import Data.Aeson qualified as Aeson
import Data.Version (showVersion)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.Projection (Projection (..))
import Hydra.API.ServerOutput (
HeadStatus,
ServerOutput (Greetings, InvalidInput, hydraHeadId, hydraNodeVersion),
ServerOutputConfig (..),
TimedServerOutput (..),
WithUTxO (..),
headStatus,
me,
prepareServerOutput,
snapshotUtxo,
)
import Hydra.Chain.ChainState (
IsChainState,
)
import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Options qualified as Options
import Hydra.Tx (Party, UTxOType)
import Hydra.Tx.HeadId (HeadId (..))
import Network.WebSockets (
PendingConnection (pendingRequest),
RequestHead (..),
acceptRequest,
receiveData,
sendTextData,
sendTextDatas,
withPingThread,
)
import Text.URI hiding (ParseException)
import Text.URI.QQ (queryKey, queryValue)
wsApp ::
forall tx.
IsChainState tx =>
Party ->
Tracer IO APIServerLog ->
TVar [TimedServerOutput tx] ->
(ClientInput tx -> IO ()) ->
Projection STM.STM (ServerOutput tx) HeadStatus ->
Projection STM.STM (ServerOutput tx) (Maybe HeadId) ->
Projection STM.STM (ServerOutput tx) (Maybe (UTxOType tx)) ->
TChan (TimedServerOutput tx) ->
PendingConnection ->
IO ()
wsApp :: forall tx.
IsChainState tx =>
Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> (ClientInput tx -> IO ())
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe HeadId)
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> PendingConnection
-> IO ()
wsApp Party
party Tracer IO APIServerLog
tracer TVar [TimedServerOutput tx]
history ClientInput tx -> IO ()
callback Projection STM (ServerOutput tx) HeadStatus
headStatusP Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP TChan (TimedServerOutput tx)
responseChannel PendingConnection
pending = do
Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer APIServerLog
NewAPIConnection
let path :: ByteString
path = RequestHead -> ByteString
requestPath (RequestHead -> ByteString) -> RequestHead -> ByteString
forall a b. (a -> b) -> a -> b
$ PendingConnection -> RequestHead
pendingRequest PendingConnection
pending
[QueryParam]
queryParams <- URI -> [QueryParam]
uriQuery (URI -> [QueryParam]) -> IO URI -> IO [QueryParam]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> IO URI
forall (m :: * -> *). MonadThrow m => ByteString -> m URI
mkURIBs ByteString
path
Connection
con <- PendingConnection -> IO Connection
acceptRequest PendingConnection
pending
TChan (TimedServerOutput tx)
chan <- STM (TChan (TimedServerOutput tx))
-> IO (TChan (TimedServerOutput tx))
forall a. STM a -> IO a
STM.atomically (STM (TChan (TimedServerOutput tx))
-> IO (TChan (TimedServerOutput tx)))
-> STM (TChan (TimedServerOutput tx))
-> IO (TChan (TimedServerOutput tx))
forall a b. (a -> b) -> a -> b
$ TChan (TimedServerOutput tx) -> STM (TChan (TimedServerOutput tx))
forall a. TChan a -> STM (TChan a)
dupTChan TChan (TimedServerOutput tx)
responseChannel
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([QueryParam] -> Bool
forall {t :: * -> *}. Foldable t => t QueryParam -> Bool
shouldNotServeHistory [QueryParam]
queryParams) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Connection -> IO ()
forwardHistory Connection
con
Connection -> IO ()
forwardGreetingOnly Connection
con
let outConfig :: ServerOutputConfig
outConfig = [QueryParam] -> ServerOutputConfig
forall {f :: * -> *}.
(DisallowElem f, Foldable f) =>
f QueryParam -> ServerOutputConfig
mkServerOutputConfig [QueryParam]
queryParams
Connection -> Int -> IO () -> IO () -> IO ()
forall a. Connection -> Int -> IO () -> IO a -> IO a
withPingThread Connection
con Int
30 (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO Any -> IO Any -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Connection -> IO Any
receiveInputs Connection
con) (TChan (TimedServerOutput tx)
-> Connection -> ServerOutputConfig -> IO Any
sendOutputs TChan (TimedServerOutput tx)
chan Connection
con ServerOutputConfig
outConfig)
where
forwardGreetingOnly :: Connection -> IO ()
forwardGreetingOnly Connection
con = do
Natural
seq <- STM IO Natural -> IO Natural
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO Natural -> IO Natural) -> STM IO Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ TVar [TimedServerOutput tx] -> STM Natural
forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
history
HeadStatus
headStatus <- STM IO HeadStatus -> IO HeadStatus
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM HeadStatus
STM IO HeadStatus
getLatestHeadStatus
Maybe HeadId
hydraHeadId <- STM IO (Maybe HeadId) -> IO (Maybe HeadId)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM (Maybe HeadId)
STM IO (Maybe HeadId)
getLatestHeadId
Maybe (UTxOType tx)
snapshotUtxo <- STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx))
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM (Maybe (UTxOType tx))
STM IO (Maybe (UTxOType tx))
getLatestSnapshotUtxo
UTCTime
time <- IO UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$
TimedServerOutput tx -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode
TimedServerOutput
{ UTCTime
time :: UTCTime
$sel:time:TimedServerOutput :: UTCTime
time
, Natural
seq :: Natural
$sel:seq:TimedServerOutput :: Natural
seq
, $sel:output:TimedServerOutput :: ServerOutput tx
output =
Greetings
{ $sel:me:PeerConnected :: Party
me = Party
party
, HeadStatus
$sel:headStatus:PeerConnected :: HeadStatus
headStatus :: HeadStatus
headStatus
, Maybe HeadId
$sel:hydraHeadId:PeerConnected :: Maybe HeadId
hydraHeadId :: Maybe HeadId
hydraHeadId
, Maybe (UTxOType tx)
$sel:snapshotUtxo:PeerConnected :: Maybe (UTxOType tx)
snapshotUtxo :: Maybe (UTxOType tx)
snapshotUtxo
, $sel:hydraNodeVersion:PeerConnected :: String
hydraNodeVersion = Version -> String
showVersion Version
Options.hydraNodeVersion
} ::
ServerOutput tx
}
Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM HeadStatus
getLatestHeadStatus} = Projection STM (ServerOutput tx) HeadStatus
headStatusP
Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM (Maybe HeadId)
getLatestHeadId} = Projection STM (ServerOutput tx) (Maybe HeadId)
headIdP
Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM (Maybe (UTxOType tx))
getLatestSnapshotUtxo} = Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP
mkServerOutputConfig :: f QueryParam -> ServerOutputConfig
mkServerOutputConfig f QueryParam
qp =
ServerOutputConfig
{ $sel:utxoInSnapshot:ServerOutputConfig :: WithUTxO
utxoInSnapshot = f QueryParam -> WithUTxO
forall {f :: * -> *}.
(DisallowElem f, Foldable f) =>
f QueryParam -> WithUTxO
decideOnUTxODisplay f QueryParam
qp
}
decideOnUTxODisplay :: f QueryParam -> WithUTxO
decideOnUTxODisplay f QueryParam
qp =
let k :: RText l
k = [queryKey|snapshot-utxo|]
v :: RText l
v = [queryValue|no|]
queryP :: QueryParam
queryP = RText 'QueryKey -> RText 'QueryValue -> QueryParam
QueryParam RText 'QueryKey
forall {l :: RTextLabel}. RText l
k RText 'QueryValue
forall {l :: RTextLabel}. RText l
v
in if QueryParam
queryP QueryParam -> f QueryParam -> Bool
forall (f :: * -> *) a.
(Foldable f, DisallowElem f, Eq a) =>
a -> f a -> Bool
`elem` f QueryParam
qp then WithUTxO
WithoutUTxO else WithUTxO
WithUTxO
shouldNotServeHistory :: t QueryParam -> Bool
shouldNotServeHistory t QueryParam
qp =
((QueryParam -> Bool) -> t QueryParam -> Bool)
-> t QueryParam -> (QueryParam -> Bool) -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip (QueryParam -> Bool) -> t QueryParam -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any t QueryParam
qp ((QueryParam -> Bool) -> Bool) -> (QueryParam -> Bool) -> Bool
forall a b. (a -> b) -> a -> b
$ \case
(QueryParam RText 'QueryKey
key RText 'QueryValue
val)
| RText 'QueryKey
key RText 'QueryKey -> RText 'QueryKey -> Bool
forall a. Eq a => a -> a -> Bool
== [queryKey|history|] -> RText 'QueryValue
val RText 'QueryValue -> RText 'QueryValue -> Bool
forall a. Eq a => a -> a -> Bool
== [queryValue|no|]
QueryParam
_other -> Bool
False
sendOutputs :: TChan (TimedServerOutput tx)
-> Connection -> ServerOutputConfig -> IO Any
sendOutputs TChan (TimedServerOutput tx)
chan Connection
con ServerOutputConfig
outConfig = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
TimedServerOutput tx
response <- STM (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a. STM a -> IO a
STM.atomically (STM (TimedServerOutput tx) -> IO (TimedServerOutput tx))
-> STM (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a b. (a -> b) -> a -> b
$ TChan (TimedServerOutput tx) -> STM (TimedServerOutput tx)
forall a. TChan a -> STM a
readTChan TChan (TimedServerOutput tx)
chan
let sentResponse :: ByteString
sentResponse =
ServerOutputConfig -> TimedServerOutput tx -> ByteString
forall tx.
IsChainState tx =>
ServerOutputConfig -> TimedServerOutput tx -> ByteString
prepareServerOutput ServerOutputConfig
outConfig TimedServerOutput tx
response
Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con ByteString
sentResponse
Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (Value -> APIServerLog
APIOutputSent (Value -> APIServerLog) -> Value -> APIServerLog
forall a b. (a -> b) -> a -> b
$ TimedServerOutput tx -> Value
forall a. ToJSON a => a -> Value
toJSON TimedServerOutput tx
response)
receiveInputs :: Connection -> IO Any
receiveInputs Connection
con = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
ByteString
msg <- Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
con
case ByteString -> Either String (ClientInput tx)
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode ByteString
msg of
Right ClientInput tx
input -> do
Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (Value -> APIServerLog
APIInputReceived (Value -> APIServerLog) -> Value -> APIServerLog
forall a b. (a -> b) -> a -> b
$ ClientInput tx -> Value
forall a. ToJSON a => a -> Value
toJSON ClientInput tx
input)
ClientInput tx -> IO ()
callback ClientInput tx
input
Left String
e -> do
let clientInput :: Text
clientInput = OnDecodeError -> ByteString -> Text
decodeUtf8With OnDecodeError
lenientDecode (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
forall l s. LazyStrict l s => l -> s
toStrict ByteString
msg
UTCTime
time <- IO UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
Natural
seq <- STM IO Natural -> IO Natural
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO Natural -> IO Natural) -> STM IO Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ TVar [TimedServerOutput tx] -> STM Natural
forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
history
let timedOutput :: TimedServerOutput tx
timedOutput = TimedServerOutput{$sel:output:TimedServerOutput :: ServerOutput tx
output = forall tx. String -> Text -> ServerOutput tx
InvalidInput @tx String
e Text
clientInput, UTCTime
$sel:time:TimedServerOutput :: UTCTime
time :: UTCTime
time, Natural
$sel:seq:TimedServerOutput :: Natural
seq :: Natural
seq}
Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ TimedServerOutput tx -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode TimedServerOutput tx
timedOutput
Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (String -> Text -> APIServerLog
APIInvalidInput String
e Text
clientInput)
forwardHistory :: Connection -> IO ()
forwardHistory Connection
con = do
[TimedServerOutput tx]
hist <- STM [TimedServerOutput tx] -> IO [TimedServerOutput tx]
forall a. STM a -> IO a
STM.atomically (TVar [TimedServerOutput tx] -> STM [TimedServerOutput tx]
forall a. TVar a -> STM a
readTVar TVar [TimedServerOutput tx]
history)
let encodeAndReverse :: [ByteString] -> a -> [ByteString]
encodeAndReverse [ByteString]
xs a
serverOutput = a -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode a
serverOutput ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
xs
Connection -> [ByteString] -> IO ()
forall a. WebSocketsData a => Connection -> [a] -> IO ()
sendTextDatas Connection
con ([ByteString] -> IO ()) -> [ByteString] -> IO ()
forall a b. (a -> b) -> a -> b
$ ([ByteString] -> TimedServerOutput tx -> [ByteString])
-> [ByteString] -> [TimedServerOutput tx] -> [ByteString]
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' [ByteString] -> TimedServerOutput tx -> [ByteString]
forall {a}. ToJSON a => [ByteString] -> a -> [ByteString]
encodeAndReverse [] [TimedServerOutput tx]
hist
nextSequenceNumber :: TVar [TimedServerOutput tx] -> STM.STM Natural
nextSequenceNumber :: forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
historyList =
TVar [TimedServerOutput tx] -> STM [TimedServerOutput tx]
forall a. TVar a -> STM a
STM.readTVar TVar [TimedServerOutput tx]
historyList STM [TimedServerOutput tx]
-> ([TimedServerOutput tx] -> STM Natural) -> STM Natural
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> Natural -> STM Natural
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
(TimedServerOutput{Natural
$sel:seq:TimedServerOutput :: forall tx. TimedServerOutput tx -> Natural
seq :: Natural
seq} : [TimedServerOutput tx]
_) -> Natural -> STM Natural
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural
seq Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)