-- | A file-based event source and sink using JSON encoding.
--
-- This is currently used as the main event source and sink in the hydra-node.
module Hydra.Events.FileBased where

import Hydra.Prelude

import Conduit (mapMC, (.|))
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
import Hydra.Events (EventSink (..), EventSource (..), HasEventId (..))
import Hydra.Events.Rotation (EventStore (..))
import Hydra.Persistence (PersistenceIncremental (..))
import System.Directory (renameFile)

-- | A basic file based event source and sink defined using a rotated
-- 'PersistenceIncremental' handle.
mkFileBasedEventStore ::
  (ToJSON e, FromJSON e, HasEventId e) =>
  FilePath ->
  PersistenceIncremental e IO ->
  IO (EventStore e IO)
mkFileBasedEventStore :: forall e.
(ToJSON e, FromJSON e, HasEventId e) =>
FilePath -> PersistenceIncremental e IO -> IO (EventStore e IO)
mkFileBasedEventStore FilePath
stateDir PersistenceIncremental e IO
persistence = do
  TVar (Maybe LogId)
eventIdV <- Maybe LogId -> IO (TVar IO (Maybe LogId))
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Maybe LogId
forall a. Maybe a
Nothing
  let
    getLastSeenEventId :: STM IO (Maybe LogId)
getLastSeenEventId = TVar IO (Maybe LogId) -> STM IO (Maybe LogId)
forall a. TVar IO a -> STM IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar (Maybe LogId)
TVar IO (Maybe LogId)
eventIdV

    setLastSeenEventId :: e -> STM IO ()
setLastSeenEventId e
evt = do
      TVar IO (Maybe LogId) -> Maybe LogId -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar (Maybe LogId)
TVar IO (Maybe LogId)
eventIdV (LogId -> Maybe LogId
forall a. a -> Maybe a
Just (LogId -> Maybe LogId) -> LogId -> Maybe LogId
forall a b. (a -> b) -> a -> b
$ e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
evt)

    -- Keep track of the last seen event id when loading
    sourceEvents :: ConduitT () e (ResourceT IO) ()
sourceEvents = do
      PersistenceIncremental e IO
-> FromJSON e => ConduitT () e (ResourceT IO) ()
forall a (m :: * -> *).
PersistenceIncremental a m
-> FromJSON a => ConduitT () a (ResourceT m) ()
source PersistenceIncremental e IO
persistence
        ConduitT () e (ResourceT IO) ()
-> ConduitT e e (ResourceT IO) ()
-> ConduitT () e (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (e -> ResourceT IO e) -> ConduitT e e (ResourceT IO) ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
mapMC
          ( \e
event -> IO e -> ResourceT IO e
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO e -> ResourceT IO e)
-> (STM e -> IO e) -> STM e -> ResourceT IO e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM e -> IO e
STM IO e -> IO e
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM e -> ResourceT IO e) -> STM e -> ResourceT IO e
forall a b. (a -> b) -> a -> b
$ do
              e -> STM IO ()
setLastSeenEventId e
event
              e -> STM e
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure e
event
          )

    -- Filter events that are already stored
    putEvent :: e -> IO ()
putEvent e
evt = do
      STM IO (Maybe LogId) -> IO (Maybe LogId)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM IO (Maybe LogId)
getLastSeenEventId IO (Maybe LogId) -> (Maybe LogId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe LogId
Nothing -> e -> IO ()
store e
evt
        Just LogId
lastSeenEventId
          | e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
evt LogId -> LogId -> Bool
forall a. Ord a => a -> a -> Bool
> LogId
lastSeenEventId -> e -> IO ()
store e
evt
          | Bool
otherwise -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    store :: e -> IO ()
store e
e = do
      PersistenceIncremental e IO -> ToJSON e => e -> IO ()
forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append PersistenceIncremental e IO
persistence e
e
      STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ e -> STM IO ()
setLastSeenEventId e
e

    rotate :: LogId -> e -> IO ()
rotate LogId
nextLogId e
checkpointEvent = do
      let rotatedPath :: FilePath
rotatedPath = FilePath
stateDir FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
"-" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> LogId -> FilePath
forall b a. (Show a, IsString b) => a -> b
show LogId
nextLogId
      ( do
          FilePath -> FilePath -> IO ()
renameFile FilePath
stateDir FilePath
rotatedPath
          PersistenceIncremental e IO -> ToJSON e => e -> IO ()
forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append PersistenceIncremental e IO
persistence e
checkpointEvent
        )
        IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
_ :: SomeException) -> do
          -- Attempt to revert the rename,
          -- ignoring errors during rollback.
          FilePath -> FilePath -> IO ()
renameFile FilePath
rotatedPath FilePath
stateDir

  EventStore e IO -> IO (EventStore e IO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    EventStore
      { $sel:eventSource:EventStore :: EventSource e IO
eventSource = EventSource{ConduitT () e (ResourceT IO) ()
HasEventId e => ConduitT () e (ResourceT IO) ()
sourceEvents :: ConduitT () e (ResourceT IO) ()
$sel:sourceEvents:EventSource :: HasEventId e => ConduitT () e (ResourceT IO) ()
sourceEvents}
      , $sel:eventSink:EventStore :: EventSink e IO
eventSink = EventSink{e -> IO ()
HasEventId e => e -> IO ()
putEvent :: e -> IO ()
$sel:putEvent:EventSink :: HasEventId e => e -> IO ()
putEvent}
      , LogId -> e -> IO ()
rotate :: LogId -> e -> IO ()
$sel:rotate:EventStore :: LogId -> e -> IO ()
rotate
      }