{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE UndecidableInstances #-}

module Hydra.API.WSServer where

import Hydra.Prelude hiding (TVar, readTVar, seq)

import Control.Concurrent.STM (TChan, dupTChan, readTChan)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TVar (TVar, readTVar)
import Data.Aeson qualified as Aeson
import Data.Version (showVersion)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.Projection (Projection (..))
import Hydra.API.ServerOutput (
  HeadStatus,
  ServerOutput (Greetings, InvalidInput, hydraNodeVersion),
  ServerOutputConfig (..),
  TimedServerOutput (..),
  WithUTxO (..),
  headStatus,
  me,
  prepareServerOutput,
  snapshotUtxo,
 )
import Hydra.Chain (
  IsChainState,
 )
import Hydra.Chain.Direct.State ()
import Hydra.Ledger (UTxOType)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Options qualified as Options
import Hydra.Party (Party)
import Network.WebSockets (
  PendingConnection (pendingRequest),
  RequestHead (..),
  acceptRequest,
  receiveData,
  sendTextData,
  sendTextDatas,
  withPingThread,
 )
import Text.URI hiding (ParseException)
import Text.URI.QQ (queryKey, queryValue)

wsApp ::
  forall tx.
  IsChainState tx =>
  Party ->
  Tracer IO APIServerLog ->
  TVar [TimedServerOutput tx] ->
  (ClientInput tx -> IO ()) ->
  -- | Read model to enhance 'Greetings' messages with 'HeadStatus'.
  Projection STM.STM (ServerOutput tx) HeadStatus ->
  -- | Read model to enhance 'Greetings' messages with snapshot UTxO.
  Projection STM.STM (ServerOutput tx) (Maybe (UTxOType tx)) ->
  TChan (TimedServerOutput tx) ->
  PendingConnection ->
  IO ()
wsApp :: forall tx.
IsChainState tx =>
Party
-> Tracer IO APIServerLog
-> TVar [TimedServerOutput tx]
-> (ClientInput tx -> IO ())
-> Projection STM (ServerOutput tx) HeadStatus
-> Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
-> TChan (TimedServerOutput tx)
-> PendingConnection
-> IO ()
wsApp Party
party Tracer IO APIServerLog
tracer TVar [TimedServerOutput tx]
history ClientInput tx -> IO ()
callback Projection STM (ServerOutput tx) HeadStatus
headStatusP Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP TChan (TimedServerOutput tx)
responseChannel PendingConnection
pending = do
  Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer APIServerLog
NewAPIConnection
  let path :: ByteString
path = RequestHead -> ByteString
requestPath (RequestHead -> ByteString) -> RequestHead -> ByteString
forall a b. (a -> b) -> a -> b
$ PendingConnection -> RequestHead
pendingRequest PendingConnection
pending
  [QueryParam]
queryParams <- URI -> [QueryParam]
uriQuery (URI -> [QueryParam]) -> IO URI -> IO [QueryParam]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> IO URI
forall (m :: * -> *). MonadThrow m => ByteString -> m URI
mkURIBs ByteString
path
  Connection
con <- PendingConnection -> IO Connection
acceptRequest PendingConnection
pending
  TChan (TimedServerOutput tx)
chan <- STM (TChan (TimedServerOutput tx))
-> IO (TChan (TimedServerOutput tx))
forall a. STM a -> IO a
STM.atomically (STM (TChan (TimedServerOutput tx))
 -> IO (TChan (TimedServerOutput tx)))
-> STM (TChan (TimedServerOutput tx))
-> IO (TChan (TimedServerOutput tx))
forall a b. (a -> b) -> a -> b
$ TChan (TimedServerOutput tx) -> STM (TChan (TimedServerOutput tx))
forall a. TChan a -> STM (TChan a)
dupTChan TChan (TimedServerOutput tx)
responseChannel

  -- api client can decide if they want to see the past history of server outputs
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([QueryParam] -> Bool
forall {t :: * -> *}. Foldable t => t QueryParam -> Bool
shouldNotServeHistory [QueryParam]
queryParams) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    Connection -> IO ()
forwardHistory Connection
con

  Connection -> IO ()
forwardGreetingOnly Connection
con

  let outConfig :: ServerOutputConfig
outConfig = [QueryParam] -> ServerOutputConfig
forall {f :: * -> *}.
(DisallowElem f, Foldable f) =>
f QueryParam -> ServerOutputConfig
mkServerOutputConfig [QueryParam]
queryParams

  Connection -> Int -> IO () -> IO () -> IO ()
forall a. Connection -> Int -> IO () -> IO a -> IO a
withPingThread Connection
con Int
30 (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    IO Any -> IO Any -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Connection -> IO Any
receiveInputs Connection
con) (TChan (TimedServerOutput tx)
-> Connection -> ServerOutputConfig -> IO Any
sendOutputs TChan (TimedServerOutput tx)
chan Connection
con ServerOutputConfig
outConfig)
 where
  -- NOTE: We will add a 'Greetings' message on each API server start. This is
  -- important to make sure the latest configured 'party' is reaching the
  -- client.
  forwardGreetingOnly :: Connection -> IO ()
forwardGreetingOnly Connection
con = do
    Natural
seq <- STM IO Natural -> IO Natural
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO Natural -> IO Natural) -> STM IO Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ TVar [TimedServerOutput tx] -> STM Natural
forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
history
    HeadStatus
headStatus <- STM IO HeadStatus -> IO HeadStatus
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM HeadStatus
STM IO HeadStatus
getLatestHeadStatus
    Maybe (UTxOType tx)
snapshotUtxo <- STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx))
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM (Maybe (UTxOType tx))
STM IO (Maybe (UTxOType tx))
getLatestSnapshotUtxo
    UTCTime
time <- IO UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime

    Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$
      TimedServerOutput tx -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode
        TimedServerOutput
          { UTCTime
time :: UTCTime
$sel:time:TimedServerOutput :: UTCTime
time
          , Natural
seq :: Natural
$sel:seq:TimedServerOutput :: Natural
seq
          , $sel:output:TimedServerOutput :: ServerOutput tx
output =
              Greetings
                { $sel:me:PeerConnected :: Party
me = Party
party
                , HeadStatus
$sel:headStatus:PeerConnected :: HeadStatus
headStatus :: HeadStatus
headStatus
                , Maybe (UTxOType tx)
$sel:snapshotUtxo:PeerConnected :: Maybe (UTxOType tx)
snapshotUtxo :: Maybe (UTxOType tx)
snapshotUtxo
                , $sel:hydraNodeVersion:PeerConnected :: String
hydraNodeVersion = Version -> String
showVersion Version
Options.hydraNodeVersion
                } ::
                ServerOutput tx
          }

  Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM HeadStatus
getLatestHeadStatus} = Projection STM (ServerOutput tx) HeadStatus
headStatusP
  Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM (Maybe (UTxOType tx))
getLatestSnapshotUtxo} = Projection STM (ServerOutput tx) (Maybe (UTxOType tx))
snapshotUtxoP

  mkServerOutputConfig :: f QueryParam -> ServerOutputConfig
mkServerOutputConfig f QueryParam
qp =
    ServerOutputConfig
      { $sel:utxoInSnapshot:ServerOutputConfig :: WithUTxO
utxoInSnapshot = f QueryParam -> WithUTxO
forall {f :: * -> *}.
(DisallowElem f, Foldable f) =>
f QueryParam -> WithUTxO
decideOnUTxODisplay f QueryParam
qp
      }

  decideOnUTxODisplay :: f QueryParam -> WithUTxO
decideOnUTxODisplay f QueryParam
qp =
    let k :: RText l
k = [queryKey|snapshot-utxo|]
        v :: RText l
v = [queryValue|no|]
        queryP :: QueryParam
queryP = RText 'QueryKey -> RText 'QueryValue -> QueryParam
QueryParam RText 'QueryKey
forall {l :: RTextLabel}. RText l
k RText 'QueryValue
forall {l :: RTextLabel}. RText l
v
     in if QueryParam
queryP QueryParam -> f QueryParam -> Bool
forall (f :: * -> *) a.
(Foldable f, DisallowElem f, Eq a) =>
a -> f a -> Bool
`elem` f QueryParam
qp then WithUTxO
WithoutUTxO else WithUTxO
WithUTxO

  shouldNotServeHistory :: t QueryParam -> Bool
shouldNotServeHistory t QueryParam
qp =
    ((QueryParam -> Bool) -> t QueryParam -> Bool)
-> t QueryParam -> (QueryParam -> Bool) -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip (QueryParam -> Bool) -> t QueryParam -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any t QueryParam
qp ((QueryParam -> Bool) -> Bool) -> (QueryParam -> Bool) -> Bool
forall a b. (a -> b) -> a -> b
$ \case
      (QueryParam RText 'QueryKey
key RText 'QueryValue
val)
        | RText 'QueryKey
key RText 'QueryKey -> RText 'QueryKey -> Bool
forall a. Eq a => a -> a -> Bool
== [queryKey|history|] -> RText 'QueryValue
val RText 'QueryValue -> RText 'QueryValue -> Bool
forall a. Eq a => a -> a -> Bool
== [queryValue|no|]
      QueryParam
_other -> Bool
False

  sendOutputs :: TChan (TimedServerOutput tx)
-> Connection -> ServerOutputConfig -> IO Any
sendOutputs TChan (TimedServerOutput tx)
chan Connection
con ServerOutputConfig
outConfig = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
    TimedServerOutput tx
response <- STM (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a. STM a -> IO a
STM.atomically (STM (TimedServerOutput tx) -> IO (TimedServerOutput tx))
-> STM (TimedServerOutput tx) -> IO (TimedServerOutput tx)
forall a b. (a -> b) -> a -> b
$ TChan (TimedServerOutput tx) -> STM (TimedServerOutput tx)
forall a. TChan a -> STM a
readTChan TChan (TimedServerOutput tx)
chan
    let sentResponse :: ByteString
sentResponse =
          ServerOutputConfig -> TimedServerOutput tx -> ByteString
forall tx.
IsChainState tx =>
ServerOutputConfig -> TimedServerOutput tx -> ByteString
prepareServerOutput ServerOutputConfig
outConfig TimedServerOutput tx
response

    Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con ByteString
sentResponse
    Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (Value -> APIServerLog
APIOutputSent (Value -> APIServerLog) -> Value -> APIServerLog
forall a b. (a -> b) -> a -> b
$ TimedServerOutput tx -> Value
forall a. ToJSON a => a -> Value
toJSON TimedServerOutput tx
response)

  receiveInputs :: Connection -> IO Any
receiveInputs Connection
con = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
    ByteString
msg <- Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
con
    case ByteString -> Either String (ClientInput tx)
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode ByteString
msg of
      Right ClientInput tx
input -> do
        Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (Value -> APIServerLog
APIInputReceived (Value -> APIServerLog) -> Value -> APIServerLog
forall a b. (a -> b) -> a -> b
$ ClientInput tx -> Value
forall a. ToJSON a => a -> Value
toJSON ClientInput tx
input)
        ClientInput tx -> IO ()
callback ClientInput tx
input
      Left String
e -> do
        -- XXX(AB): toStrict might be problematic as it implies consuming the full
        -- message to memory
        let clientInput :: Text
clientInput = OnDecodeError -> ByteString -> Text
decodeUtf8With OnDecodeError
lenientDecode (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
forall l s. LazyStrict l s => l -> s
toStrict ByteString
msg
        UTCTime
time <- IO UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
        Natural
seq <- STM IO Natural -> IO Natural
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO Natural -> IO Natural) -> STM IO Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ TVar [TimedServerOutput tx] -> STM Natural
forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
history
        let timedOutput :: TimedServerOutput tx
timedOutput = TimedServerOutput{$sel:output:TimedServerOutput :: ServerOutput tx
output = forall tx. String -> Text -> ServerOutput tx
InvalidInput @tx String
e Text
clientInput, UTCTime
$sel:time:TimedServerOutput :: UTCTime
time :: UTCTime
time, Natural
$sel:seq:TimedServerOutput :: Natural
seq :: Natural
seq}
        Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ TimedServerOutput tx -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode TimedServerOutput tx
timedOutput
        Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (String -> Text -> APIServerLog
APIInvalidInput String
e Text
clientInput)

  forwardHistory :: Connection -> IO ()
forwardHistory Connection
con = do
    [TimedServerOutput tx]
hist <- STM [TimedServerOutput tx] -> IO [TimedServerOutput tx]
forall a. STM a -> IO a
STM.atomically (TVar [TimedServerOutput tx] -> STM [TimedServerOutput tx]
forall a. TVar a -> STM a
readTVar TVar [TimedServerOutput tx]
history)
    let encodeAndReverse :: [ByteString] -> a -> [ByteString]
encodeAndReverse [ByteString]
xs a
serverOutput = a -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode a
serverOutput ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
xs
    Connection -> [ByteString] -> IO ()
forall a. WebSocketsData a => Connection -> [a] -> IO ()
sendTextDatas Connection
con ([ByteString] -> IO ()) -> [ByteString] -> IO ()
forall a b. (a -> b) -> a -> b
$ ([ByteString] -> TimedServerOutput tx -> [ByteString])
-> [ByteString] -> [TimedServerOutput tx] -> [ByteString]
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' [ByteString] -> TimedServerOutput tx -> [ByteString]
forall {a}. ToJSON a => [ByteString] -> a -> [ByteString]
encodeAndReverse [] [TimedServerOutput tx]
hist

nextSequenceNumber :: TVar [TimedServerOutput tx] -> STM.STM Natural
nextSequenceNumber :: forall tx. TVar [TimedServerOutput tx] -> STM Natural
nextSequenceNumber TVar [TimedServerOutput tx]
historyList =
  TVar [TimedServerOutput tx] -> STM [TimedServerOutput tx]
forall a. TVar a -> STM a
STM.readTVar TVar [TimedServerOutput tx]
historyList STM [TimedServerOutput tx]
-> ([TimedServerOutput tx] -> STM Natural) -> STM Natural
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    [] -> Natural -> STM Natural
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
    (TimedServerOutput{Natural
$sel:seq:TimedServerOutput :: forall tx. TimedServerOutput tx -> Natural
seq :: Natural
seq} : [TimedServerOutput tx]
_) -> Natural -> STM Natural
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural
seq Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)