module Hydra.Events.Rotation where

import Hydra.Prelude

import Conduit (MonadUnliftIO, runConduit, runResourceT, (.|))
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO, writeTVar)
import Data.Conduit.Combinators qualified as C
import Hydra.Events (EventId, EventSink (..), EventSource (..), HasEventId (..))

newtype RotationConfig = RotateAfter Natural

type LogId = EventId

-- | An EventSource and EventSink combined
data EventStore e m
  = EventStore
  { forall e (m :: * -> *). EventStore e m -> EventSource e m
eventSource :: EventSource e m
  , forall e (m :: * -> *). EventStore e m -> EventSink e m
eventSink :: EventSink e m
  , forall e (m :: * -> *). EventStore e m -> LogId -> e -> m ()
rotate :: LogId -> e -> m ()
  -- ^ Rotate existing events into a given log id and start a new log from given e.
  }

-- | Creates an event store that rotates according to given config and 'StateAggregate'.
newRotatedEventStore ::
  (HasEventId e, MonadSTM m, MonadUnliftIO m, MonadTime m) =>
  RotationConfig ->
  -- | Starting state of aggregate
  s ->
  -- | Update aggregate state
  (s -> e -> s) ->
  -- | Create a checkpoint event
  (s -> EventId -> UTCTime -> e) ->
  EventStore e m ->
  m (EventStore e m)
newRotatedEventStore :: forall e (m :: * -> *) s.
(HasEventId e, MonadSTM m, MonadUnliftIO m, MonadTime m) =>
RotationConfig
-> s
-> (s -> e -> s)
-> (s -> LogId -> UTCTime -> e)
-> EventStore e m
-> m (EventStore e m)
newRotatedEventStore RotationConfig
config s
s0 s -> e -> s
aggregator s -> LogId -> UTCTime -> e
checkpointer EventStore e m
eventStore = do
  (Natural
currentNumberOfEvents, LogId
lastEventId, s
currentAggregateState) <-
    ResourceT m (Natural, LogId, s) -> m (Natural, LogId, s)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT m (Natural, LogId, s) -> m (Natural, LogId, s))
-> (ConduitT () Void (ResourceT m) (Natural, LogId, s)
    -> ResourceT m (Natural, LogId, s))
-> ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> m (Natural, LogId, s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> ResourceT m (Natural, LogId, s)
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void (ResourceT m) (Natural, LogId, s)
 -> m (Natural, LogId, s))
-> ConduitT () Void (ResourceT m) (Natural, LogId, s)
-> m (Natural, LogId, s)
forall a b. (a -> b) -> a -> b
$
      EventSource e m -> HasEventId e => ConduitT () e (ResourceT m) ()
forall e (m :: * -> *).
EventSource e m -> HasEventId e => ConduitT () e (ResourceT m) ()
sourceEvents EventSource e m
eventSource ConduitT () e (ResourceT m) ()
-> ConduitT e Void (ResourceT m) (Natural, LogId, s)
-> ConduitT () Void (ResourceT m) (Natural, LogId, s)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((Natural, LogId, s) -> e -> (Natural, LogId, s))
-> (Natural, LogId, s)
-> ConduitT e Void (ResourceT m) (Natural, LogId, s)
forall (m :: * -> *) a b o.
Monad m =>
(a -> b -> a) -> a -> ConduitT b o m a
C.foldl (Natural, LogId, s) -> e -> (Natural, LogId, s)
aggregateEvents (Natural
0, LogId
0, s
s0)
  TVar m s
aggregateStateV <- s -> m (TVar m s)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO s
currentAggregateState
  TVar m Natural
numberOfEventsV <- Natural -> m (TVar m Natural)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Natural
currentNumberOfEvents
  -- check rotation on startup
  m Bool -> m () -> m ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar m Natural -> m Bool
shouldRotate TVar m Natural
numberOfEventsV) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    TVar m Natural -> TVar m s -> LogId -> m ()
rotateEventLog TVar m Natural
numberOfEventsV TVar m s
aggregateStateV LogId
lastEventId
  EventStore e m -> m (EventStore e m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    EventStore
      { EventSource e m
$sel:eventSource:EventStore :: EventSource e m
eventSource :: EventSource e m
eventSource
      , $sel:eventSink:EventStore :: EventSink e m
eventSink =
          EventSink
            { $sel:putEvent:EventSink :: HasEventId e => e -> m ()
putEvent = TVar m Natural -> TVar m s -> e -> m ()
rotatedPutEvent TVar m Natural
numberOfEventsV TVar m s
aggregateStateV
            }
      , -- NOTE: Don't allow rotation on-demand
        $sel:rotate:EventStore :: LogId -> e -> m ()
rotate = (e -> m ()) -> LogId -> e -> m ()
forall a b. a -> b -> a
const ((e -> m ()) -> LogId -> e -> m ())
-> (m () -> e -> m ()) -> m () -> LogId -> e -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> e -> m ()
forall a b. a -> b -> a
const (m () -> LogId -> e -> m ()) -> m () -> LogId -> e -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      }
 where
  RotateAfter Natural
rotateAfterX = RotationConfig
config

  aggregateEvents :: (Natural, LogId, s) -> e -> (Natural, LogId, s)
aggregateEvents (!Natural
n, !LogId
_evId, !s
acc) e
e = (Natural
n Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1, e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
e, s -> e -> s
aggregator s
acc e
e)

  shouldRotate :: TVar m Natural -> m Bool
shouldRotate TVar m Natural
numberOfEventsV = do
    Natural
currentNumberOfEvents <- TVar m Natural -> m Natural
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m Natural
numberOfEventsV
    Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Natural
currentNumberOfEvents Natural -> Natural -> Bool
forall a. Ord a => a -> a -> Bool
>= Natural
rotateAfterX

  rotatedPutEvent :: TVar m Natural -> TVar m s -> e -> m ()
rotatedPutEvent TVar m Natural
numberOfEventsV TVar m s
aggregateStateV e
event = do
    e -> m ()
HasEventId e => e -> m ()
putEvent e
event
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      -- aggregate new state
      TVar m s -> (s -> s) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m s
aggregateStateV (s -> e -> s
`aggregator` e
event)
      -- bump numberOfEvents
      Natural
numberOfEvents <- TVar m Natural -> STM m Natural
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Natural
numberOfEventsV
      let numberOfEvents' :: Natural
numberOfEvents' = Natural
numberOfEvents Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
      TVar m Natural -> Natural -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m Natural
numberOfEventsV Natural
numberOfEvents'
    -- check rotation
    m Bool -> m () -> m ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (TVar m Natural -> m Bool
shouldRotate TVar m Natural
numberOfEventsV) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      let eventId :: LogId
eventId = e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
event
      TVar m Natural -> TVar m s -> LogId -> m ()
rotateEventLog TVar m Natural
numberOfEventsV TVar m s
aggregateStateV LogId
eventId

  rotateEventLog :: TVar m Natural -> TVar m s -> LogId -> m ()
rotateEventLog TVar m Natural
numberOfEventsV TVar m s
aggregateStateV LogId
lastEventId = do
    -- build checkpoint event
    UTCTime
now <- m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
    s
aggregateState <- TVar m s -> m s
forall a. TVar m a -> m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> m a
readTVarIO TVar m s
aggregateStateV
    let checkpoint :: e
checkpoint = s -> LogId -> UTCTime -> e
checkpointer s
aggregateState (LogId
lastEventId LogId -> LogId -> LogId
forall a. Num a => a -> a -> a
+ LogId
1) UTCTime
now
    -- rotate with checkpoint
    LogId -> e -> m ()
rotate LogId
lastEventId e
checkpoint
    -- clear numberOfEvents + bump logId
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      TVar m Natural -> Natural -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m Natural
numberOfEventsV Natural
0

  EventStore{EventSource e m
$sel:eventSource:EventStore :: forall e (m :: * -> *). EventStore e m -> EventSource e m
eventSource :: EventSource e m
eventSource, $sel:eventSink:EventStore :: forall e (m :: * -> *). EventStore e m -> EventSink e m
eventSink = EventSink{HasEventId e => e -> m ()
$sel:putEvent:EventSink :: forall e (m :: * -> *). EventSink e m -> HasEventId e => e -> m ()
putEvent :: HasEventId e => e -> m ()
putEvent}, LogId -> e -> m ()
$sel:rotate:EventStore :: forall e (m :: * -> *). EventStore e m -> LogId -> e -> m ()
rotate :: LogId -> e -> m ()
rotate} = EventStore e m
eventStore