module Hydra.Events.Rotation where
import Hydra.Prelude
import Conduit (MonadUnliftIO, runConduit, runResourceT, (.|))
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO, writeTVar)
import Data.Conduit.Combinators qualified as C
import Hydra.Events (EventId, EventSink (..), EventSource (..), HasEventId (..))
newtype RotationConfig = RotateAfter Natural
type LogId = EventId
data EventStore e m
= EventStore
{ forall e (m :: * -> *). EventStore e m -> EventSource e m
eventSource :: EventSource e m
, forall e (m :: * -> *). EventStore e m -> EventSink e m
eventSink :: EventSink e m
, forall e (m :: * -> *). EventStore e m -> LogId -> e -> m ()
rotate :: LogId -> e -> m ()
}
newRotatedEventStore ::
(HasEventId e, MonadSTM m, MonadUnliftIO m, MonadTime m) =>
RotationConfig ->
s ->
(s -> e -> s) ->
(s -> EventId -> UTCTime -> e) ->
EventStore e m ->
m (EventStore e m)
newRotatedEventStore :: forall e (m :: * -> *) s.
(HasEventId e, MonadSTM m, MonadUnliftIO m, MonadTime m) =>
RotationConfig
-> s
-> (s -> e -> s)
-> (s -> LogId -> UTCTime -> e)
-> EventStore e m
-> m (EventStore e m)
newRotatedEventStore RotationConfig
config s
s0 s -> e -> s
aggregator s -> LogId -> UTCTime -> e
checkpointer EventStore e m
eventStore = do
(Natural
currentNumberOfEvents, LogId
lastEventId, s
currentAggregateState) <-
ResourceT m (Natural, LogId, s) -> m (Natural, LogId, s)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT m (Natural, LogId, s) -> m (Natural, LogId, s))
-> (ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> ResourceT m (Natural, LogId, s))
-> ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> m (Natural, LogId, s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> ResourceT m (Natural, LogId, s)
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> m (Natural, LogId, s))
-> ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> m (Natural, LogId, s)
forall a b. (a -> b) -> a -> b
$
EventSource e m -> HasEventId e => ConduitT () e (ResourceT m) ()
forall e (m :: * -> *).
EventSource e m -> HasEventId e => ConduitT () e (ResourceT m) ()
sourceEvents EventSource e m
eventSource ConduitT () e (ResourceT m) ()
-> ConduitT e Void (ResourceT m) (Natural, LogId, s)
-> ConduitT () Void (ResourceT m) (Natural, LogId, s)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((Natural, LogId, s) -> e -> (Natural, LogId, s))
-> (Natural, LogId, s)
-> ConduitT e Void (ResourceT m) (Natural, LogId, s)
forall (m :: * -> *) a b o.
Monad m =>
(a -> b -> a) -> a -> ConduitT b o m a
C.foldl (Natural, LogId, s) -> e -> (Natural, LogId, s)
aggregateEvents (Natural
0, LogId
0, s
s0)
TVar m s
aggregateStateV <- s -> m (TVar m s)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO s
currentAggregateState
TVar m Natural
numberOfEventsV <- Natural -> m (TVar m Natural)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Natural
currentNumberOfEvents
m Bool -> m () -> m ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar m Natural -> m Bool
shouldRotate TVar m Natural
numberOfEventsV) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar m Natural -> TVar m s -> LogId -> m ()
rotateEventLog TVar m Natural
numberOfEventsV TVar m s
aggregateStateV LogId
lastEventId
EventStore e m -> m (EventStore e m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
EventStore
{ EventSource e m
$sel:eventSource:EventStore :: EventSource e m
eventSource :: EventSource e m
eventSource
, $sel:eventSink:EventStore :: EventSink e m
eventSink =
EventSink
{ $sel:putEvent:EventSink :: HasEventId e => e -> m ()
putEvent = TVar m Natural -> TVar m s -> e -> m ()
rotatedPutEvent TVar m Natural
numberOfEventsV TVar m s
aggregateStateV
}
,
$sel:rotate:EventStore :: LogId -> e -> m ()
rotate = (e -> m ()) -> LogId -> e -> m ()
forall a b. a -> b -> a
const ((e -> m ()) -> LogId -> e -> m ())
-> (m () -> e -> m ()) -> m () -> LogId -> e -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> e -> m ()
forall a b. a -> b -> a
const (m () -> LogId -> e -> m ()) -> m () -> LogId -> e -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
}
where
RotateAfter Natural
rotateAfterX = RotationConfig
config
aggregateEvents :: (Natural, LogId, s) -> e -> (Natural, LogId, s)
aggregateEvents (!Natural
n, !LogId
_evId, !s
acc) e
e = (Natural
n Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1, e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
e, s -> e -> s
aggregator s
acc e
e)
shouldRotate :: TVar m Natural -> m Bool
shouldRotate TVar m Natural
numberOfEventsV = do
Natural
currentNumberOfEvents <- TVar m Natural -> m Natural
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m Natural
numberOfEventsV
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Natural
currentNumberOfEvents Natural -> Natural -> Bool
forall a. Ord a => a -> a -> Bool
>= Natural
rotateAfterX
rotatedPutEvent :: TVar m Natural -> TVar m s -> e -> m ()
rotatedPutEvent TVar m Natural
numberOfEventsV TVar m s
aggregateStateV e
event = do
e -> m ()
HasEventId e => e -> m ()
putEvent e
event
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar m s -> (s -> s) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m s
aggregateStateV (s -> e -> s
`aggregator` e
event)
Natural
numberOfEvents <- TVar m Natural -> STM m Natural
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Natural
numberOfEventsV
let numberOfEvents' :: Natural
numberOfEvents' = Natural
numberOfEvents Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
TVar m Natural -> Natural -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m Natural
numberOfEventsV Natural
numberOfEvents'
m Bool -> m () -> m ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar m Natural -> m Bool
shouldRotate TVar m Natural
numberOfEventsV) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let eventId :: LogId
eventId = e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
event
TVar m Natural -> TVar m s -> LogId -> m ()
rotateEventLog TVar m Natural
numberOfEventsV TVar m s
aggregateStateV LogId
eventId
rotateEventLog :: TVar m Natural -> TVar m s -> LogId -> m ()
rotateEventLog TVar m Natural
numberOfEventsV TVar m s
aggregateStateV LogId
lastEventId = do
UTCTime
now <- m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
s
aggregateState <- TVar m s -> m s
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m s
aggregateStateV
let checkpoint :: e
checkpoint = s -> LogId -> UTCTime -> e
checkpointer s
aggregateState (LogId
lastEventId LogId -> LogId -> LogId
forall a. Num a => a -> a -> a
+ LogId
1) UTCTime
now
LogId -> e -> m ()
rotate LogId
lastEventId e
checkpoint
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar m Natural -> Natural -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m Natural
numberOfEventsV Natural
0
EventStore{EventSource e m
$sel:eventSource:EventStore :: forall e (m :: * -> *). EventStore e m -> EventSource e m
eventSource :: EventSource e m
eventSource, $sel:eventSink:EventStore :: forall e (m :: * -> *). EventStore e m -> EventSink e m
eventSink = EventSink{HasEventId e => e -> m ()
$sel:putEvent:EventSink :: forall e (m :: * -> *). EventSink e m -> HasEventId e => e -> m ()
putEvent :: HasEventId e => e -> m ()
putEvent}, LogId -> e -> m ()
$sel:rotate:EventStore :: forall e (m :: * -> *). EventStore e m -> LogId -> e -> m ()
rotate :: LogId -> e -> m ()
rotate} = EventStore e m
eventStore