{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE UndecidableInstances #-}

module Hydra.API.WSServer where

import Hydra.Prelude hiding (TVar, filter, readTVar, seq)

import Conduit (ConduitT, ResourceT, mapM_C, runConduitRes, (.|))
import Control.Concurrent.STM (TChan, dupTChan, readTChan)
import Control.Concurrent.STM qualified as STM
import Control.Lens ((.~))
import Data.Aeson qualified as Aeson
import Data.Aeson.Lens (atKey)
import Data.Conduit.Combinators (filter)
import Data.Version (showVersion)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.Projection (Projection (..))
import Hydra.API.ServerOutput (
  ClientMessage,
  Greetings (..),
  HeadStatus,
  InvalidInput (..),
  ServerOutputConfig (..),
  TimedServerOutput (..),
  WithAddressedTx (..),
  WithUTxO (..),
  handleUtxoInclusion,
  headStatus,
  me,
  prepareServerOutput,
  removeSnapshotUTxO,
  snapshotUtxo,
 )
import Hydra.API.ServerOutputFilter (
  ServerOutputFilter (..),
 )
import Hydra.Chain.ChainState (
  IsChainState,
 )
import Hydra.Chain.Direct.State ()
import Hydra.HeadLogic (StateChanged)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Options qualified as Options
import Hydra.Tx (Party, UTxOType)
import Hydra.Tx.HeadId (HeadId (..))
import Network.WebSockets (
  PendingConnection (pendingRequest),
  RequestHead (..),
  acceptRequest,
  receiveData,
  sendTextData,
  withPingThread,
 )
import Text.URI hiding (ParseException)
import Text.URI.QQ (queryKey, queryValue)

wsApp ::
  forall tx.
  IsChainState tx =>
  Party ->
  Tracer IO APIServerLog ->
  ConduitT () (TimedServerOutput tx) (ResourceT IO) () ->
  (ClientInput tx -> IO ()) ->
  -- | Read model to enhance 'Greetings' messages with 'HeadStatus'.
  Projection STM.STM (StateChanged tx) HeadStatus ->
  -- | Read model to enhance 'Greetings' messages with 'HeadId'.
  Projection STM.STM (StateChanged tx) (Maybe HeadId) ->
  -- | Read model to enhance 'Greetings' messages with snapshot UTxO.
  Projection STM.STM (StateChanged tx) (Maybe (UTxOType tx)) ->
  TChan (Either (TimedServerOutput tx) (ClientMessage tx)) ->
  ServerOutputFilter tx ->
  PendingConnection ->
  IO ()
wsApp :: forall tx.
IsChainState tx =>
Party
-> Tracer IO APIServerLog
-> ConduitT () (TimedServerOutput tx) (ResourceT IO) ()
-> (ClientInput tx -> IO ())
-> Projection STM (StateChanged tx) HeadStatus
-> Projection STM (StateChanged tx) (Maybe HeadId)
-> Projection STM (StateChanged tx) (Maybe (UTxOType tx))
-> TChan (Either (TimedServerOutput tx) (ClientMessage tx))
-> ServerOutputFilter tx
-> PendingConnection
-> IO ()
wsApp Party
party Tracer IO APIServerLog
tracer ConduitT () (TimedServerOutput tx) (ResourceT IO) ()
history ClientInput tx -> IO ()
callback Projection STM (StateChanged tx) HeadStatus
headStatusP Projection STM (StateChanged tx) (Maybe HeadId)
headIdP Projection STM (StateChanged tx) (Maybe (UTxOType tx))
snapshotUtxoP TChan (Either (TimedServerOutput tx) (ClientMessage tx))
responseChannel ServerOutputFilter{TimedServerOutput tx -> Text -> Bool
txContainsAddr :: TimedServerOutput tx -> Text -> Bool
$sel:txContainsAddr:ServerOutputFilter :: forall tx.
ServerOutputFilter tx -> TimedServerOutput tx -> Text -> Bool
txContainsAddr} PendingConnection
pending = do
  Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer APIServerLog
NewAPIConnection
  let path :: ByteString
path = RequestHead -> ByteString
requestPath (RequestHead -> ByteString) -> RequestHead -> ByteString
forall a b. (a -> b) -> a -> b
$ PendingConnection -> RequestHead
pendingRequest PendingConnection
pending
  [QueryParam]
queryParams <- URI -> [QueryParam]
uriQuery (URI -> [QueryParam]) -> IO URI -> IO [QueryParam]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> IO URI
forall (m :: * -> *). MonadThrow m => ByteString -> m URI
mkURIBs ByteString
path
  Connection
con <- PendingConnection -> IO Connection
acceptRequest PendingConnection
pending
  TChan (Either (TimedServerOutput tx) (ClientMessage tx))
chan <- STM (TChan (Either (TimedServerOutput tx) (ClientMessage tx)))
-> IO (TChan (Either (TimedServerOutput tx) (ClientMessage tx)))
forall a. STM a -> IO a
STM.atomically (STM (TChan (Either (TimedServerOutput tx) (ClientMessage tx)))
 -> IO (TChan (Either (TimedServerOutput tx) (ClientMessage tx))))
-> STM (TChan (Either (TimedServerOutput tx) (ClientMessage tx)))
-> IO (TChan (Either (TimedServerOutput tx) (ClientMessage tx)))
forall a b. (a -> b) -> a -> b
$ TChan (Either (TimedServerOutput tx) (ClientMessage tx))
-> STM (TChan (Either (TimedServerOutput tx) (ClientMessage tx)))
forall a. TChan a -> STM (TChan a)
dupTChan TChan (Either (TimedServerOutput tx) (ClientMessage tx))
responseChannel

  let outConfig :: ServerOutputConfig
outConfig = [QueryParam] -> ServerOutputConfig
forall {t :: * -> *}.
(DisallowElem t, Foldable t) =>
t QueryParam -> ServerOutputConfig
mkServerOutputConfig [QueryParam]
queryParams

  -- api client can decide if they want to see the past history of server outputs
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([QueryParam] -> Bool
forall {t :: * -> *}. Foldable t => t QueryParam -> Bool
shouldServeHistory [QueryParam]
queryParams) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    Connection -> ServerOutputConfig -> IO ()
forwardHistory Connection
con ServerOutputConfig
outConfig

  ServerOutputConfig -> Connection -> IO ()
forwardGreetingOnly ServerOutputConfig
outConfig Connection
con

  Connection -> Int -> IO () -> IO () -> IO ()
forall a. Connection -> Int -> IO () -> IO a -> IO a
withPingThread Connection
con Int
30 (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    IO Any -> IO Any -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Connection -> IO Any
receiveInputs Connection
con) (TChan (Either (TimedServerOutput tx) (ClientMessage tx))
-> Connection -> ServerOutputConfig -> IO Any
sendOutputs TChan (Either (TimedServerOutput tx) (ClientMessage tx))
chan Connection
con ServerOutputConfig
outConfig)
 where
  -- NOTE: We will add a 'Greetings' message on each API server start. This is
  -- important to make sure the latest configured 'party' is reaching the
  -- client.
  forwardGreetingOnly :: ServerOutputConfig -> Connection -> IO ()
forwardGreetingOnly ServerOutputConfig
config Connection
con = do
    HeadStatus
headStatus <- STM IO HeadStatus -> IO HeadStatus
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM HeadStatus
STM IO HeadStatus
getLatestHeadStatus
    Maybe HeadId
hydraHeadId <- STM IO (Maybe HeadId) -> IO (Maybe HeadId)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM (Maybe HeadId)
STM IO (Maybe HeadId)
getLatestHeadId
    Maybe (UTxOType tx)
snapshotUtxo <- STM IO (Maybe (UTxOType tx)) -> IO (Maybe (UTxOType tx))
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM (Maybe (UTxOType tx))
STM IO (Maybe (UTxOType tx))
getLatestSnapshotUtxo
    Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$
      ServerOutputConfig
-> (ByteString -> ByteString) -> ByteString -> ByteString
forall a. ServerOutputConfig -> (a -> a) -> a -> a
handleUtxoInclusion ServerOutputConfig
config (Key -> Traversal' ByteString (Maybe Value)
forall t. AsValue t => Key -> Traversal' t (Maybe Value)
atKey Key
"snapshotUtxo" ((Maybe Value -> Identity (Maybe Value))
 -> ByteString -> Identity ByteString)
-> Maybe Value -> ByteString -> ByteString
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Value
forall a. Maybe a
Nothing) (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$
        Greetings tx -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode
          Greetings
            { $sel:me:Greetings :: Party
me = Party
party
            , HeadStatus
$sel:headStatus:Greetings :: HeadStatus
headStatus :: HeadStatus
headStatus
            , Maybe HeadId
hydraHeadId :: Maybe HeadId
$sel:hydraHeadId:Greetings :: Maybe HeadId
hydraHeadId
            , Maybe (UTxOType tx)
$sel:snapshotUtxo:Greetings :: Maybe (UTxOType tx)
snapshotUtxo :: Maybe (UTxOType tx)
snapshotUtxo
            , $sel:hydraNodeVersion:Greetings :: String
hydraNodeVersion = Version -> String
showVersion Version
Options.hydraNodeVersion
            }

  Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM HeadStatus
getLatestHeadStatus} = Projection STM (StateChanged tx) HeadStatus
headStatusP
  Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM (Maybe HeadId)
getLatestHeadId} = Projection STM (StateChanged tx) (Maybe HeadId)
headIdP
  Projection{$sel:getLatest:Projection :: forall (stm :: * -> *) event model.
Projection stm event model -> stm model
getLatest = STM (Maybe (UTxOType tx))
getLatestSnapshotUtxo} = Projection STM (StateChanged tx) (Maybe (UTxOType tx))
snapshotUtxoP

  mkServerOutputConfig :: t QueryParam -> ServerOutputConfig
mkServerOutputConfig t QueryParam
qp =
    ServerOutputConfig
      { $sel:utxoInSnapshot:ServerOutputConfig :: WithUTxO
utxoInSnapshot = t QueryParam -> WithUTxO
forall {f :: * -> *}.
(DisallowElem f, Foldable f) =>
f QueryParam -> WithUTxO
decideOnUTxODisplay t QueryParam
qp
      , $sel:addressInTx:ServerOutputConfig :: WithAddressedTx
addressInTx = t QueryParam -> WithAddressedTx
forall {t :: * -> *}. Foldable t => t QueryParam -> WithAddressedTx
decideOnAddressDisplay t QueryParam
qp
      }

  decideOnUTxODisplay :: f QueryParam -> WithUTxO
decideOnUTxODisplay f QueryParam
qp =
    let k :: RText l
k = [queryKey|snapshot-utxo|]
        v :: RText l
v = [queryValue|no|]
        queryP :: QueryParam
queryP = RText 'QueryKey -> RText 'QueryValue -> QueryParam
QueryParam RText 'QueryKey
forall {l :: RTextLabel}. RText l
k RText 'QueryValue
forall {l :: RTextLabel}. RText l
v
     in if QueryParam
queryP QueryParam -> f QueryParam -> Bool
forall (f :: * -> *) a.
(Foldable f, DisallowElem f, Eq a) =>
a -> f a -> Bool
`elem` f QueryParam
qp then WithUTxO
WithoutUTxO else WithUTxO
WithUTxO

  decideOnAddressDisplay :: t QueryParam -> WithAddressedTx
decideOnAddressDisplay t QueryParam
qp =
    case (QueryParam -> Bool) -> t QueryParam -> Maybe QueryParam
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find QueryParam -> Bool
queryByAddress t QueryParam
qp of
      Just (QueryParam RText 'QueryKey
_ RText 'QueryValue
v) -> Text -> WithAddressedTx
WithAddressedTx (RText 'QueryValue -> Text
forall (l :: RTextLabel). RText l -> Text
unRText RText 'QueryValue
v)
      Maybe QueryParam
_ -> WithAddressedTx
WithoutAddressedTx
   where
    queryByAddress :: QueryParam -> Bool
queryByAddress = \case
      (QueryParam RText 'QueryKey
key RText 'QueryValue
_) | RText 'QueryKey
key RText 'QueryKey -> RText 'QueryKey -> Bool
forall a. Eq a => a -> a -> Bool
== [queryKey|address|] -> Bool
True
      QueryParam
_other -> Bool
False

  shouldServeHistory :: t QueryParam -> Bool
shouldServeHistory t QueryParam
qp =
    ((QueryParam -> Bool) -> t QueryParam -> Bool)
-> t QueryParam -> (QueryParam -> Bool) -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip (QueryParam -> Bool) -> t QueryParam -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any t QueryParam
qp ((QueryParam -> Bool) -> Bool) -> (QueryParam -> Bool) -> Bool
forall a b. (a -> b) -> a -> b
$ \case
      (QueryParam RText 'QueryKey
key RText 'QueryValue
val)
        | RText 'QueryKey
key RText 'QueryKey -> RText 'QueryKey -> Bool
forall a. Eq a => a -> a -> Bool
== [queryKey|history|] -> RText 'QueryValue
val RText 'QueryValue -> RText 'QueryValue -> Bool
forall a. Eq a => a -> a -> Bool
== [queryValue|yes|]
      QueryParam
_other -> Bool
False

  sendOutputs :: TChan (Either (TimedServerOutput tx) (ClientMessage tx))
-> Connection -> ServerOutputConfig -> IO Any
sendOutputs TChan (Either (TimedServerOutput tx) (ClientMessage tx))
chan Connection
con outConfig :: ServerOutputConfig
outConfig@ServerOutputConfig{WithAddressedTx
$sel:addressInTx:ServerOutputConfig :: ServerOutputConfig -> WithAddressedTx
addressInTx :: WithAddressedTx
addressInTx} = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
    Either (TimedServerOutput tx) (ClientMessage tx)
response <- STM (Either (TimedServerOutput tx) (ClientMessage tx))
-> IO (Either (TimedServerOutput tx) (ClientMessage tx))
forall a. STM a -> IO a
STM.atomically (STM (Either (TimedServerOutput tx) (ClientMessage tx))
 -> IO (Either (TimedServerOutput tx) (ClientMessage tx)))
-> STM (Either (TimedServerOutput tx) (ClientMessage tx))
-> IO (Either (TimedServerOutput tx) (ClientMessage tx))
forall a b. (a -> b) -> a -> b
$ TChan (Either (TimedServerOutput tx) (ClientMessage tx))
-> STM (Either (TimedServerOutput tx) (ClientMessage tx))
forall a. TChan a -> STM a
readTChan TChan (Either (TimedServerOutput tx) (ClientMessage tx))
chan
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (WithAddressedTx
-> Either (TimedServerOutput tx) (ClientMessage tx) -> Bool
isAddressInTx WithAddressedTx
addressInTx Either (TimedServerOutput tx) (ClientMessage tx)
response) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
      Either (TimedServerOutput tx) (ClientMessage tx) -> IO ()
sendResponse Either (TimedServerOutput tx) (ClientMessage tx)
response
   where
    sendResponse :: Either (TimedServerOutput tx) (ClientMessage tx) -> IO ()
sendResponse = \case
      Left TimedServerOutput tx
response -> do
        let sentResponse :: ByteString
sentResponse = ServerOutputConfig -> TimedServerOutput tx -> ByteString
forall tx.
IsChainState tx =>
ServerOutputConfig -> TimedServerOutput tx -> ByteString
prepareServerOutput ServerOutputConfig
outConfig TimedServerOutput tx
response
        Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con ByteString
sentResponse
        Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (Value -> APIServerLog
APIOutputSent (Value -> APIServerLog) -> Value -> APIServerLog
forall a b. (a -> b) -> a -> b
$ TimedServerOutput tx -> Value
forall a. ToJSON a => a -> Value
toJSON TimedServerOutput tx
response)
      Right ClientMessage tx
response -> do
        Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ServerOutputConfig
-> (ByteString -> ByteString) -> ByteString -> ByteString
forall a. ServerOutputConfig -> (a -> a) -> a -> a
handleUtxoInclusion ServerOutputConfig
outConfig ByteString -> ByteString
removeSnapshotUTxO (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ClientMessage tx -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode ClientMessage tx
response)
        Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (Value -> APIServerLog
APIOutputSent (Value -> APIServerLog) -> Value -> APIServerLog
forall a b. (a -> b) -> a -> b
$ ClientMessage tx -> Value
forall a. ToJSON a => a -> Value
toJSON ClientMessage tx
response)

  receiveInputs :: Connection -> IO Any
receiveInputs Connection
con = IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
    ByteString
msg <- Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
con
    case ByteString -> Either String (ClientInput tx)
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode ByteString
msg of
      Right ClientInput tx
input -> do
        Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (Value -> APIServerLog
APIInputReceived (Value -> APIServerLog) -> Value -> APIServerLog
forall a b. (a -> b) -> a -> b
$ ClientInput tx -> Value
forall a. ToJSON a => a -> Value
toJSON ClientInput tx
input)
        ClientInput tx -> IO ()
callback ClientInput tx
input
      Left String
e -> do
        -- XXX(AB): toStrict might be problematic as it implies consuming the full
        -- message to memory
        let clientInput :: Text
clientInput = OnDecodeError -> ByteString -> Text
decodeUtf8With OnDecodeError
lenientDecode (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
forall l s. LazyStrict l s => l -> s
toStrict ByteString
msg
        Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ InvalidInput -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode (InvalidInput -> ByteString) -> InvalidInput -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> Text -> InvalidInput
InvalidInput String
e Text
clientInput
        Tracer IO APIServerLog -> APIServerLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO APIServerLog
tracer (String -> Text -> APIServerLog
APIInvalidInput String
e Text
clientInput)

  forwardHistory :: Connection -> ServerOutputConfig -> IO ()
forwardHistory Connection
con config :: ServerOutputConfig
config@ServerOutputConfig{WithAddressedTx
$sel:addressInTx:ServerOutputConfig :: ServerOutputConfig -> WithAddressedTx
addressInTx :: WithAddressedTx
addressInTx} = do
    ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
runConduitRes (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) () -> IO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () (TimedServerOutput tx) (ResourceT IO) ()
history ConduitT () (TimedServerOutput tx) (ResourceT IO) ()
-> ConduitT (TimedServerOutput tx) Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (TimedServerOutput tx -> Bool)
-> ConduitT
     (TimedServerOutput tx) (TimedServerOutput tx) (ResourceT IO) ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
filter (WithAddressedTx
-> Either (TimedServerOutput tx) (ClientMessage tx) -> Bool
isAddressInTx WithAddressedTx
addressInTx (Either (TimedServerOutput tx) (ClientMessage tx) -> Bool)
-> (TimedServerOutput tx
    -> Either (TimedServerOutput tx) (ClientMessage tx))
-> TimedServerOutput tx
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimedServerOutput tx
-> Either (TimedServerOutput tx) (ClientMessage tx)
forall a b. a -> Either a b
Left) ConduitT
  (TimedServerOutput tx) (TimedServerOutput tx) (ResourceT IO) ()
-> ConduitT (TimedServerOutput tx) Void (ResourceT IO) ()
-> ConduitT (TimedServerOutput tx) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (TimedServerOutput tx -> ResourceT IO ())
-> ConduitT (TimedServerOutput tx) Void (ResourceT IO) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
mapM_C (IO () -> ResourceT IO ()
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ResourceT IO ())
-> (TimedServerOutput tx -> IO ())
-> TimedServerOutput tx
-> ResourceT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
con (ByteString -> IO ())
-> (TimedServerOutput tx -> ByteString)
-> TimedServerOutput tx
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerOutputConfig -> TimedServerOutput tx -> ByteString
forall tx.
IsChainState tx =>
ServerOutputConfig -> TimedServerOutput tx -> ByteString
prepareServerOutput ServerOutputConfig
config)

  isAddressInTx :: WithAddressedTx
-> Either (TimedServerOutput tx) (ClientMessage tx) -> Bool
isAddressInTx WithAddressedTx
addressInTx = \case
    Left TimedServerOutput tx
tx -> TimedServerOutput tx -> Bool
checkAddress TimedServerOutput tx
tx
    Right ClientMessage tx
_ -> Bool
True
   where
    checkAddress :: TimedServerOutput tx -> Bool
checkAddress TimedServerOutput tx
tx =
      case WithAddressedTx
addressInTx of
        WithAddressedTx Text
addr -> TimedServerOutput tx -> Text -> Bool
txContainsAddr TimedServerOutput tx
tx Text
addr
        WithAddressedTx
WithoutAddressedTx -> Bool
True