-- | A file-based event source and sink using JSON encoding.
--
-- This serves as an example of how to create an 'EventSource' and 'EventSink'.
module Hydra.Events.FileBased where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
import Hydra.HeadLogic.Outcome (StateChanged)
import Hydra.Persistence (PersistenceIncremental (..))

-- | A basic file based event source and sink defined using an
-- 'PersistenceIncremental' handle.
--
-- The complexity in this implementation mostly stems from the fact that we want
-- to be backward-compatible with the old, plain format of storing
-- 'StateChanged' items directly to disk using 'PersistenceIncremental'.
--
-- If any 'Legacy StateChanged' items are discovered, a running index is used
-- for the 'eventId', while the 'New StateEvent' values are just stored as is.
--
-- A new implementation for an 'EventSource' with a compatible 'EventSink' could
-- be defined more generically with constraints:
--
-- (ToJSON e, FromJSON e, HasEventId) e => (EventSource e m, EventSink e m)
eventPairFromPersistenceIncremental ::
  (IsChainState tx, MonadSTM m) =>
  PersistenceIncremental (PersistedStateChange tx) m ->
  m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
eventPairFromPersistenceIncremental :: forall tx (m :: * -> *).
(IsChainState tx, MonadSTM m) =>
PersistenceIncremental (PersistedStateChange tx) m
-> m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
eventPairFromPersistenceIncremental PersistenceIncremental{ToJSON (PersistedStateChange tx) => PersistedStateChange tx -> m ()
append :: ToJSON (PersistedStateChange tx) => PersistedStateChange tx -> m ()
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append, FromJSON (PersistedStateChange tx) => m [PersistedStateChange tx]
loadAll :: FromJSON (PersistedStateChange tx) => m [PersistedStateChange tx]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll} = do
  TVar m (Maybe EventId)
eventIdV <- Maybe EventId -> m (TVar m (Maybe EventId))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Maybe EventId
forall a. Maybe a
Nothing
  let
    getLastSeenEventId :: STM m (Maybe EventId)
getLastSeenEventId = TVar m (Maybe EventId) -> STM m (Maybe EventId)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Maybe EventId)
eventIdV

    setLastSeenEventId :: StateEvent tx -> STM m ()
setLastSeenEventId StateEvent{EventId
eventId :: forall tx. StateEvent tx -> EventId
eventId :: EventId
eventId} = do
      TVar m (Maybe EventId) -> Maybe EventId -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Maybe EventId)
eventIdV (EventId -> Maybe EventId
forall a. a -> Maybe a
Just EventId
eventId)

    getNextEventId :: STM m EventId
getNextEventId =
      EventId -> (EventId -> EventId) -> Maybe EventId -> EventId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe EventId
0 (EventId -> EventId -> EventId
forall a. Num a => a -> a -> a
+ EventId
1) (Maybe EventId -> EventId)
-> STM m (Maybe EventId) -> STM m EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m (Maybe EventId) -> STM m (Maybe EventId)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Maybe EventId)
eventIdV

    -- Keep track of the last seen event id when loading
    getEvents :: m [StateEvent tx]
getEvents = do
      [PersistedStateChange tx]
items <- m [PersistedStateChange tx]
FromJSON (PersistedStateChange tx) => m [PersistedStateChange tx]
loadAll
      STM m [StateEvent tx] -> m [StateEvent tx]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [StateEvent tx] -> m [StateEvent tx])
-> ((PersistedStateChange tx -> STM m (StateEvent tx))
    -> STM m [StateEvent tx])
-> (PersistedStateChange tx -> STM m (StateEvent tx))
-> m [StateEvent tx]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [PersistedStateChange tx]
-> (PersistedStateChange tx -> STM m (StateEvent tx))
-> STM m [StateEvent tx]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [PersistedStateChange tx]
items ((PersistedStateChange tx -> STM m (StateEvent tx))
 -> m [StateEvent tx])
-> (PersistedStateChange tx -> STM m (StateEvent tx))
-> m [StateEvent tx]
forall a b. (a -> b) -> a -> b
$ \PersistedStateChange tx
i -> do
        StateEvent tx
event <- case PersistedStateChange tx
i of
          New StateEvent tx
e -> StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StateEvent tx
e
          Legacy StateChanged tx
sc -> do
            EventId
eventId <- STM m EventId
getNextEventId
            StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StateEvent tx -> STM m (StateEvent tx))
-> StateEvent tx -> STM m (StateEvent tx)
forall a b. (a -> b) -> a -> b
$ EventId -> StateChanged tx -> StateEvent tx
forall tx. EventId -> StateChanged tx -> StateEvent tx
StateEvent EventId
eventId StateChanged tx
sc

        StateEvent tx -> STM m ()
setLastSeenEventId StateEvent tx
event
        StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StateEvent tx
event

    -- Filter events that are already stored
    putEvent :: StateEvent tx -> m ()
putEvent e :: StateEvent tx
e@StateEvent{EventId
eventId :: forall tx. StateEvent tx -> EventId
eventId :: EventId
eventId} = do
      STM m (Maybe EventId) -> m (Maybe EventId)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (Maybe EventId)
getLastSeenEventId m (Maybe EventId) -> (Maybe EventId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe EventId
Nothing -> StateEvent tx -> m ()
store StateEvent tx
e
        Just EventId
lastSeenEventId
          | EventId
eventId EventId -> EventId -> Bool
forall a. Ord a => a -> a -> Bool
> EventId
lastSeenEventId -> StateEvent tx -> m ()
store StateEvent tx
e
          | Bool
otherwise -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    store :: StateEvent tx -> m ()
store StateEvent tx
e = do
      ToJSON (PersistedStateChange tx) => PersistedStateChange tx -> m ()
PersistedStateChange tx -> m ()
append (StateEvent tx -> PersistedStateChange tx
forall tx. StateEvent tx -> PersistedStateChange tx
New StateEvent tx
e)
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StateEvent tx -> STM m ()
setLastSeenEventId StateEvent tx
e

  (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
-> m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EventSource{m [StateEvent tx]
HasEventId (StateEvent tx) => m [StateEvent tx]
getEvents :: m [StateEvent tx]
$sel:getEvents:EventSource :: HasEventId (StateEvent tx) => m [StateEvent tx]
getEvents}, EventSink{StateEvent tx -> m ()
HasEventId (StateEvent tx) => StateEvent tx -> m ()
putEvent :: StateEvent tx -> m ()
$sel:putEvent:EventSink :: HasEventId (StateEvent tx) => StateEvent tx -> m ()
putEvent})

-- | Internal data type used by 'createJSONFileEventSourceAndSink' to be
-- compatible with plain usage of 'PersistenceIncrementa' using plain
-- 'StateChanged' items to the new 'StateEvent' persisted items.
data PersistedStateChange tx
  = Legacy (StateChanged tx)
  | New (StateEvent tx)
  deriving stock ((forall x.
 PersistedStateChange tx -> Rep (PersistedStateChange tx) x)
-> (forall x.
    Rep (PersistedStateChange tx) x -> PersistedStateChange tx)
-> Generic (PersistedStateChange tx)
forall x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
forall x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall tx x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
forall tx x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
$cfrom :: forall tx x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
from :: forall x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
$cto :: forall tx x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
to :: forall x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
Generic, Int -> PersistedStateChange tx -> ShowS
[PersistedStateChange tx] -> ShowS
PersistedStateChange tx -> String
(Int -> PersistedStateChange tx -> ShowS)
-> (PersistedStateChange tx -> String)
-> ([PersistedStateChange tx] -> ShowS)
-> Show (PersistedStateChange tx)
forall tx.
IsChainState tx =>
Int -> PersistedStateChange tx -> ShowS
forall tx. IsChainState tx => [PersistedStateChange tx] -> ShowS
forall tx. IsChainState tx => PersistedStateChange tx -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall tx.
IsChainState tx =>
Int -> PersistedStateChange tx -> ShowS
showsPrec :: Int -> PersistedStateChange tx -> ShowS
$cshow :: forall tx. IsChainState tx => PersistedStateChange tx -> String
show :: PersistedStateChange tx -> String
$cshowList :: forall tx. IsChainState tx => [PersistedStateChange tx] -> ShowS
showList :: [PersistedStateChange tx] -> ShowS
Show, PersistedStateChange tx -> PersistedStateChange tx -> Bool
(PersistedStateChange tx -> PersistedStateChange tx -> Bool)
-> (PersistedStateChange tx -> PersistedStateChange tx -> Bool)
-> Eq (PersistedStateChange tx)
forall tx.
IsChainState tx =>
PersistedStateChange tx -> PersistedStateChange tx -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall tx.
IsChainState tx =>
PersistedStateChange tx -> PersistedStateChange tx -> Bool
== :: PersistedStateChange tx -> PersistedStateChange tx -> Bool
$c/= :: forall tx.
IsChainState tx =>
PersistedStateChange tx -> PersistedStateChange tx -> Bool
/= :: PersistedStateChange tx -> PersistedStateChange tx -> Bool
Eq)

instance IsChainState tx => ToJSON (PersistedStateChange tx) where
  toJSON :: PersistedStateChange tx -> Value
toJSON = \case
    Legacy StateChanged tx
sc -> StateChanged tx -> Value
forall a. ToJSON a => a -> Value
toJSON StateChanged tx
sc
    New StateEvent tx
e -> StateEvent tx -> Value
forall a. ToJSON a => a -> Value
toJSON StateEvent tx
e

instance IsChainState tx => FromJSON (PersistedStateChange tx) where
  parseJSON :: Value -> Parser (PersistedStateChange tx)
parseJSON Value
v =
    StateEvent tx -> PersistedStateChange tx
forall tx. StateEvent tx -> PersistedStateChange tx
New (StateEvent tx -> PersistedStateChange tx)
-> Parser (StateEvent tx) -> Parser (PersistedStateChange tx)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Value -> Parser (StateEvent tx)
forall a. FromJSON a => Value -> Parser a
parseJSON Value
v
      Parser (PersistedStateChange tx)
-> Parser (PersistedStateChange tx)
-> Parser (PersistedStateChange tx)
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> StateChanged tx -> PersistedStateChange tx
forall tx. StateChanged tx -> PersistedStateChange tx
Legacy (StateChanged tx -> PersistedStateChange tx)
-> Parser (StateChanged tx) -> Parser (PersistedStateChange tx)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Value -> Parser (StateChanged tx)
forall a. FromJSON a => Value -> Parser a
parseJSON Value
v