-- | The general input queue from which the Hydra head is fed with inputs.
module Hydra.Node.InputQueue where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (
  MonadLabelledSTM,
  isEmptyTQueue,
  labelTQueueIO,
  labelTVarIO,
  modifyTVar',
  newTQueue,
  newTVarIO,
  readTQueue,
  writeTQueue,
 )
import Control.Monad.Class.MonadAsync (async)

-- | The single, required queue in the system from which a hydra head is "fed".
-- NOTE(SN): this probably should be bounded and include proper logging
-- NOTE(SN): handle pattern, but likely not required as there is no need for an
-- alternative implementation
data InputQueue m e = InputQueue
  { forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: e -> m ()
  , forall (m :: * -> *) e.
InputQueue m e -> DiffTime -> Queued e -> m ()
reenqueue :: DiffTime -> Queued e -> m ()
  , forall (m :: * -> *) e. InputQueue m e -> m (Queued e)
dequeue :: m (Queued e)
  , forall (m :: * -> *) e. InputQueue m e -> m Bool
isEmpty :: m Bool
  }

data Queued a = Queued {forall a. Queued a -> Word64
queuedId :: Word64, forall a. Queued a -> a
queuedItem :: a}

createInputQueue ::
  ( MonadDelay m
  , MonadAsync m
  , MonadLabelledSTM m
  ) =>
  m (InputQueue m e)
createInputQueue :: forall (m :: * -> *) e.
(MonadDelay m, MonadAsync m, MonadLabelledSTM m) =>
m (InputQueue m e)
createInputQueue = do
  TVar m Integer
numThreads <- Integer -> m (TVar m Integer)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Integer
0 :: Integer)
  TVar m Word64
nextId <- Word64 -> m (TVar m Word64)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Word64
0
  TVar m Integer -> String -> m ()
forall a. TVar m a -> String -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TVar m a -> String -> m ()
labelTVarIO TVar m Integer
numThreads String
"num-threads"
  TQueue m (Queued e)
q <- STM m (TQueue m (Queued e)) -> m (TQueue m (Queued e))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (TQueue m (Queued e))
forall a. STM m (TQueue m a)
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
  TQueue m (Queued e) -> String -> m ()
forall a. TQueue m a -> String -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TQueue m a -> String -> m ()
labelTQueueIO TQueue m (Queued e)
q String
"input-queue"
  InputQueue m e -> m (InputQueue m e)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    InputQueue
      { $sel:enqueue:InputQueue :: e -> m ()
enqueue = \e
queuedItem ->
          STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            Word64
queuedId <- TVar m Word64 -> STM m Word64
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Word64
nextId
            TQueue m (Queued e) -> Queued e -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (Queued e)
q Queued{Word64
$sel:queuedId:Queued :: Word64
queuedId :: Word64
queuedId, e
$sel:queuedItem:Queued :: e
queuedItem :: e
queuedItem}
            TVar m Word64 -> (Word64 -> Word64) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Word64
nextId Word64 -> Word64
forall a. Enum a => a -> a
succ
      , $sel:reenqueue:InputQueue :: DiffTime -> Queued e -> m ()
reenqueue = \DiffTime
delay Queued e
e -> do
          STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m Integer -> (Integer -> Integer) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Integer
numThreads Integer -> Integer
forall a. Enum a => a -> a
succ
          m (Async m ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Async m ()) -> m ())
-> (m () -> m (Async m ())) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Async m ())
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
async (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
delay
            STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              TVar m Integer -> (Integer -> Integer) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Integer
numThreads Integer -> Integer
forall a. Enum a => a -> a
pred
              TQueue m (Queued e) -> Queued e -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (Queued e)
q Queued e
e
      , $sel:dequeue:InputQueue :: m (Queued e)
dequeue =
          STM m (Queued e) -> m (Queued e)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Queued e) -> m (Queued e))
-> STM m (Queued e) -> m (Queued e)
forall a b. (a -> b) -> a -> b
$ TQueue m (Queued e) -> STM m (Queued e)
forall a. TQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m (Queued e)
q
      , $sel:isEmpty:InputQueue :: m Bool
isEmpty = do
          STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
            Integer
n <- TVar m Integer -> STM m Integer
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Integer
numThreads
            Bool
isEmpty' <- TQueue m (Queued e) -> STM m Bool
forall a. TQueue m a -> STM m Bool
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m Bool
isEmptyTQueue TQueue m (Queued e)
q
            Bool -> STM m Bool
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
isEmpty' Bool -> Bool -> Bool
&& Integer
n Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
0)
      }