module Hydra.Events.FileBased where
import Hydra.Prelude
import Conduit (mapMC, (.|))
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
import Hydra.Events (EventSink (..), EventSource (..), HasEventId (..))
import Hydra.Events.Rotation (EventStore (..))
import Hydra.Persistence (PersistenceIncremental (..))
import System.Directory (renameFile)
mkFileBasedEventStore ::
(ToJSON e, FromJSON e, HasEventId e) =>
FilePath ->
PersistenceIncremental e IO ->
IO (EventStore e IO)
mkFileBasedEventStore :: forall e.
(ToJSON e, FromJSON e, HasEventId e) =>
FilePath -> PersistenceIncremental e IO -> IO (EventStore e IO)
mkFileBasedEventStore FilePath
stateDir PersistenceIncremental e IO
persistence = do
TVar (Maybe LogId)
eventIdV <- Maybe LogId -> IO (TVar IO (Maybe LogId))
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Maybe LogId
forall a. Maybe a
Nothing
let
getLastSeenEventId :: STM IO (Maybe LogId)
getLastSeenEventId = TVar IO (Maybe LogId) -> STM IO (Maybe LogId)
forall a. TVar IO a -> STM IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar (Maybe LogId)
TVar IO (Maybe LogId)
eventIdV
setLastSeenEventId :: e -> STM IO ()
setLastSeenEventId e
evt = do
TVar IO (Maybe LogId) -> Maybe LogId -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar (Maybe LogId)
TVar IO (Maybe LogId)
eventIdV (LogId -> Maybe LogId
forall a. a -> Maybe a
Just (LogId -> Maybe LogId) -> LogId -> Maybe LogId
forall a b. (a -> b) -> a -> b
$ e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
evt)
sourceEvents :: ConduitT () e (ResourceT IO) ()
sourceEvents = do
PersistenceIncremental e IO
-> FromJSON e => ConduitT () e (ResourceT IO) ()
forall a (m :: * -> *).
PersistenceIncremental a m
-> FromJSON a => ConduitT () a (ResourceT m) ()
source PersistenceIncremental e IO
persistence
ConduitT () e (ResourceT IO) ()
-> ConduitT e e (ResourceT IO) ()
-> ConduitT () e (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (e -> ResourceT IO e) -> ConduitT e e (ResourceT IO) ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
mapMC
( \e
event -> IO e -> ResourceT IO e
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO e -> ResourceT IO e)
-> (STM e -> IO e) -> STM e -> ResourceT IO e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM e -> IO e
STM IO e -> IO e
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM e -> ResourceT IO e) -> STM e -> ResourceT IO e
forall a b. (a -> b) -> a -> b
$ do
e -> STM IO ()
setLastSeenEventId e
event
e -> STM e
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure e
event
)
putEvent :: e -> IO ()
putEvent e
evt = do
STM IO (Maybe LogId) -> IO (Maybe LogId)
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM IO (Maybe LogId)
getLastSeenEventId IO (Maybe LogId) -> (Maybe LogId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe LogId
Nothing -> e -> IO ()
store e
evt
Just LogId
lastSeenEventId
| e -> LogId
forall a. HasEventId a => a -> LogId
getEventId e
evt LogId -> LogId -> Bool
forall a. Ord a => a -> a -> Bool
> LogId
lastSeenEventId -> e -> IO ()
store e
evt
| Bool
otherwise -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
store :: e -> IO ()
store e
e = do
PersistenceIncremental e IO -> ToJSON e => e -> IO ()
forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append PersistenceIncremental e IO
persistence e
e
STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ e -> STM IO ()
setLastSeenEventId e
e
rotate :: LogId -> e -> IO ()
rotate LogId
nextLogId e
checkpointEvent = do
let rotatedPath :: FilePath
rotatedPath = FilePath
stateDir FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
"-" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> LogId -> FilePath
forall b a. (Show a, IsString b) => a -> b
show LogId
nextLogId
( do
FilePath -> FilePath -> IO ()
renameFile FilePath
stateDir FilePath
rotatedPath
PersistenceIncremental e IO -> ToJSON e => e -> IO ()
forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append PersistenceIncremental e IO
persistence e
checkpointEvent
)
IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
_ :: SomeException) -> do
FilePath -> FilePath -> IO ()
renameFile FilePath
rotatedPath FilePath
stateDir
EventStore e IO -> IO (EventStore e IO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
EventStore
{ $sel:eventSource:EventStore :: EventSource e IO
eventSource = EventSource{ConduitT () e (ResourceT IO) ()
HasEventId e => ConduitT () e (ResourceT IO) ()
sourceEvents :: ConduitT () e (ResourceT IO) ()
$sel:sourceEvents:EventSource :: HasEventId e => ConduitT () e (ResourceT IO) ()
sourceEvents}
, $sel:eventSink:EventStore :: EventSink e IO
eventSink = EventSink{e -> IO ()
HasEventId e => e -> IO ()
putEvent :: e -> IO ()
$sel:putEvent:EventSink :: HasEventId e => e -> IO ()
putEvent}
, LogId -> e -> IO ()
rotate :: LogId -> e -> IO ()
$sel:rotate:EventStore :: LogId -> e -> IO ()
rotate
}