module Hydra.Events.FileBased where
import Hydra.Prelude
import Conduit (mapMC, (.|))
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
import Hydra.Events (EventSink (..), EventSource (..), HasEventId (..))
import Hydra.Persistence (PersistenceIncremental (..))
eventPairFromPersistenceIncremental ::
(ToJSON e, FromJSON e, HasEventId e, MonadSTM m) =>
PersistenceIncremental e m ->
m (EventSource e m, EventSink e m)
eventPairFromPersistenceIncremental :: forall e (m :: * -> *).
(ToJSON e, FromJSON e, HasEventId e, MonadSTM m) =>
PersistenceIncremental e m -> m (EventSource e m, EventSink e m)
eventPairFromPersistenceIncremental PersistenceIncremental{ToJSON e => e -> m ()
append :: ToJSON e => e -> m ()
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append, FromJSON e => ConduitT () e (ResourceT m) ()
source :: FromJSON e => ConduitT () e (ResourceT m) ()
$sel:source:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m
-> FromJSON a => ConduitT () a (ResourceT m) ()
source} = do
TVar m (Maybe EventId)
eventIdV <- Maybe EventId -> m (TVar m (Maybe EventId))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Maybe EventId
forall a. Maybe a
Nothing
let
getLastSeenEventId :: STM m (Maybe EventId)
getLastSeenEventId = TVar m (Maybe EventId) -> STM m (Maybe EventId)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Maybe EventId)
eventIdV
setLastSeenEventId :: e -> STM m ()
setLastSeenEventId e
evt = do
TVar m (Maybe EventId) -> Maybe EventId -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Maybe EventId)
eventIdV (EventId -> Maybe EventId
forall a. a -> Maybe a
Just (EventId -> Maybe EventId) -> EventId -> Maybe EventId
forall a b. (a -> b) -> a -> b
$ e -> EventId
forall a. HasEventId a => a -> EventId
getEventId e
evt)
sourceEvents :: ConduitT () e (ResourceT m) ()
sourceEvents =
ConduitT () e (ResourceT m) ()
FromJSON e => ConduitT () e (ResourceT m) ()
source
ConduitT () e (ResourceT m) ()
-> ConduitT e e (ResourceT m) () -> ConduitT () e (ResourceT m) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (e -> ResourceT m e) -> ConduitT e e (ResourceT m) ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
mapMC
( \e
event -> m e -> ResourceT m e
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m e -> ResourceT m e)
-> (STM m e -> m e) -> STM m e -> ResourceT m e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM m e -> m e
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m e -> ResourceT m e) -> STM m e -> ResourceT m e
forall a b. (a -> b) -> a -> b
$ do
e -> STM m ()
setLastSeenEventId e
event
e -> STM m e
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure e
event
)
putEvent :: e -> m ()
putEvent e
evt = do
STM m (Maybe EventId) -> m (Maybe EventId)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (Maybe EventId)
getLastSeenEventId m (Maybe EventId) -> (Maybe EventId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe EventId
Nothing -> e -> m ()
store e
evt
Just EventId
lastSeenEventId
| e -> EventId
forall a. HasEventId a => a -> EventId
getEventId e
evt EventId -> EventId -> Bool
forall a. Ord a => a -> a -> Bool
> EventId
lastSeenEventId -> e -> m ()
store e
evt
| Bool
otherwise -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
store :: e -> m ()
store e
e = do
e -> m ()
ToJSON e => e -> m ()
append e
e
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ e -> STM m ()
setLastSeenEventId e
e
(EventSource e m, EventSink e m)
-> m (EventSource e m, EventSink e m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EventSource{ConduitT () e (ResourceT m) ()
HasEventId e => ConduitT () e (ResourceT m) ()
sourceEvents :: ConduitT () e (ResourceT m) ()
$sel:sourceEvents:EventSource :: HasEventId e => ConduitT () e (ResourceT m) ()
sourceEvents}, EventSink{e -> m ()
HasEventId e => e -> m ()
putEvent :: e -> m ()
$sel:putEvent:EventSink :: HasEventId e => e -> m ()
putEvent})