module Hydra.Node.InputQueue where
import Hydra.Prelude
import Control.Concurrent.Class.MonadSTM (
MonadLabelledSTM,
isEmptyTQueue,
labelTQueueIO,
labelTVarIO,
modifyTVar',
newTQueue,
newTVarIO,
readTQueue,
writeTQueue,
)
import Control.Monad.Class.MonadAsync (async)
data InputQueue m e = InputQueue
{ forall (m :: * -> *) e. InputQueue m e -> e -> m ()
enqueue :: e -> m ()
, forall (m :: * -> *) e.
InputQueue m e -> DiffTime -> Queued e -> m ()
reenqueue :: DiffTime -> Queued e -> m ()
, forall (m :: * -> *) e. InputQueue m e -> m (Queued e)
dequeue :: m (Queued e)
, forall (m :: * -> *) e. InputQueue m e -> m Bool
isEmpty :: m Bool
}
data Queued a = Queued {forall a. Queued a -> Word64
queuedId :: Word64, forall a. Queued a -> a
queuedItem :: a}
createInputQueue ::
( MonadDelay m
, MonadAsync m
, MonadLabelledSTM m
) =>
m (InputQueue m e)
createInputQueue :: forall (m :: * -> *) e.
(MonadDelay m, MonadAsync m, MonadLabelledSTM m) =>
m (InputQueue m e)
createInputQueue = do
TVar m Integer
numThreads <- Integer -> m (TVar m Integer)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Integer
0 :: Integer)
TVar m Word64
nextId <- Word64 -> m (TVar m Word64)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Word64
0
TVar m Integer -> String -> m ()
forall a. TVar m a -> String -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TVar m a -> String -> m ()
labelTVarIO TVar m Integer
numThreads String
"num-threads"
TQueue m (Queued e)
q <- STM m (TQueue m (Queued e)) -> m (TQueue m (Queued e))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (TQueue m (Queued e))
forall a. STM m (TQueue m a)
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
TQueue m (Queued e) -> String -> m ()
forall a. TQueue m a -> String -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TQueue m a -> String -> m ()
labelTQueueIO TQueue m (Queued e)
q String
"input-queue"
InputQueue m e -> m (InputQueue m e)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
InputQueue
{ $sel:enqueue:InputQueue :: e -> m ()
enqueue = \e
queuedItem ->
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Word64
queuedId <- TVar m Word64 -> STM m Word64
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Word64
nextId
TQueue m (Queued e) -> Queued e -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (Queued e)
q Queued{Word64
$sel:queuedId:Queued :: Word64
queuedId :: Word64
queuedId, e
$sel:queuedItem:Queued :: e
queuedItem :: e
queuedItem}
TVar m Word64 -> (Word64 -> Word64) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Word64
nextId Word64 -> Word64
forall a. Enum a => a -> a
succ
, $sel:reenqueue:InputQueue :: DiffTime -> Queued e -> m ()
reenqueue = \DiffTime
delay Queued e
e -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar m Integer -> (Integer -> Integer) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Integer
numThreads Integer -> Integer
forall a. Enum a => a -> a
succ
m (Async m ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Async m ()) -> m ())
-> (m () -> m (Async m ())) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Async m ())
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
async (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
delay
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar m Integer -> (Integer -> Integer) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Integer
numThreads Integer -> Integer
forall a. Enum a => a -> a
pred
TQueue m (Queued e) -> Queued e -> STM m ()
forall a. TQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (Queued e)
q Queued e
e
, $sel:dequeue:InputQueue :: m (Queued e)
dequeue =
STM m (Queued e) -> m (Queued e)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Queued e) -> m (Queued e))
-> STM m (Queued e) -> m (Queued e)
forall a b. (a -> b) -> a -> b
$ TQueue m (Queued e) -> STM m (Queued e)
forall a. TQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m (Queued e)
q
, $sel:isEmpty:InputQueue :: m Bool
isEmpty = do
STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
Integer
n <- TVar m Integer -> STM m Integer
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Integer
numThreads
Bool
isEmpty' <- TQueue m (Queued e) -> STM m Bool
forall a. TQueue m a -> STM m Bool
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m Bool
isEmptyTQueue TQueue m (Queued e)
q
Bool -> STM m Bool
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
isEmpty' Bool -> Bool -> Bool
&& Integer
n Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
0)
}