module Hydra.Events.FileBased where
import Hydra.Prelude
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
import Hydra.HeadLogic.Outcome (StateChanged)
import Hydra.Persistence (PersistenceIncremental (..))
eventPairFromPersistenceIncremental ::
(IsChainState tx, MonadSTM m) =>
PersistenceIncremental (PersistedStateChange tx) m ->
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
eventPairFromPersistenceIncremental :: forall tx (m :: * -> *).
(IsChainState tx, MonadSTM m) =>
PersistenceIncremental (PersistedStateChange tx) m
-> m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
eventPairFromPersistenceIncremental PersistenceIncremental{ToJSON (PersistedStateChange tx) => PersistedStateChange tx -> m ()
append :: ToJSON (PersistedStateChange tx) => PersistedStateChange tx -> m ()
$sel:append:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append, FromJSON (PersistedStateChange tx) => m [PersistedStateChange tx]
loadAll :: FromJSON (PersistedStateChange tx) => m [PersistedStateChange tx]
$sel:loadAll:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m -> FromJSON a => m [a]
loadAll} = do
TVar m (Maybe EventId)
eventIdV <- Maybe EventId -> m (TVar m (Maybe EventId))
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Maybe EventId
forall a. Maybe a
Nothing
let
getLastSeenEventId :: STM m (Maybe EventId)
getLastSeenEventId = TVar m (Maybe EventId) -> STM m (Maybe EventId)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Maybe EventId)
eventIdV
setLastSeenEventId :: StateEvent tx -> STM m ()
setLastSeenEventId StateEvent{EventId
eventId :: forall tx. StateEvent tx -> EventId
eventId :: EventId
eventId} = do
TVar m (Maybe EventId) -> Maybe EventId -> STM m ()
forall a. TVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m (Maybe EventId)
eventIdV (EventId -> Maybe EventId
forall a. a -> Maybe a
Just EventId
eventId)
getNextEventId :: STM m EventId
getNextEventId =
EventId -> (EventId -> EventId) -> Maybe EventId -> EventId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe EventId
0 (EventId -> EventId -> EventId
forall a. Num a => a -> a -> a
+ EventId
1) (Maybe EventId -> EventId)
-> STM m (Maybe EventId) -> STM m EventId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar m (Maybe EventId) -> STM m (Maybe EventId)
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m (Maybe EventId)
eventIdV
getEvents :: m [StateEvent tx]
getEvents = do
[PersistedStateChange tx]
items <- m [PersistedStateChange tx]
FromJSON (PersistedStateChange tx) => m [PersistedStateChange tx]
loadAll
STM m [StateEvent tx] -> m [StateEvent tx]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [StateEvent tx] -> m [StateEvent tx])
-> ((PersistedStateChange tx -> STM m (StateEvent tx))
-> STM m [StateEvent tx])
-> (PersistedStateChange tx -> STM m (StateEvent tx))
-> m [StateEvent tx]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [PersistedStateChange tx]
-> (PersistedStateChange tx -> STM m (StateEvent tx))
-> STM m [StateEvent tx]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [PersistedStateChange tx]
items ((PersistedStateChange tx -> STM m (StateEvent tx))
-> m [StateEvent tx])
-> (PersistedStateChange tx -> STM m (StateEvent tx))
-> m [StateEvent tx]
forall a b. (a -> b) -> a -> b
$ \PersistedStateChange tx
i -> do
StateEvent tx
event <- case PersistedStateChange tx
i of
New StateEvent tx
e -> StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StateEvent tx
e
Legacy StateChanged tx
sc -> do
EventId
eventId <- STM m EventId
getNextEventId
StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StateEvent tx -> STM m (StateEvent tx))
-> StateEvent tx -> STM m (StateEvent tx)
forall a b. (a -> b) -> a -> b
$ EventId -> StateChanged tx -> StateEvent tx
forall tx. EventId -> StateChanged tx -> StateEvent tx
StateEvent EventId
eventId StateChanged tx
sc
StateEvent tx -> STM m ()
setLastSeenEventId StateEvent tx
event
StateEvent tx -> STM m (StateEvent tx)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StateEvent tx
event
putEvent :: StateEvent tx -> m ()
putEvent e :: StateEvent tx
e@StateEvent{EventId
eventId :: forall tx. StateEvent tx -> EventId
eventId :: EventId
eventId} = do
STM m (Maybe EventId) -> m (Maybe EventId)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (Maybe EventId)
getLastSeenEventId m (Maybe EventId) -> (Maybe EventId -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe EventId
Nothing -> StateEvent tx -> m ()
store StateEvent tx
e
Just EventId
lastSeenEventId
| EventId
eventId EventId -> EventId -> Bool
forall a. Ord a => a -> a -> Bool
> EventId
lastSeenEventId -> StateEvent tx -> m ()
store StateEvent tx
e
| Bool
otherwise -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
store :: StateEvent tx -> m ()
store StateEvent tx
e = do
ToJSON (PersistedStateChange tx) => PersistedStateChange tx -> m ()
PersistedStateChange tx -> m ()
append (StateEvent tx -> PersistedStateChange tx
forall tx. StateEvent tx -> PersistedStateChange tx
New StateEvent tx
e)
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StateEvent tx -> STM m ()
setLastSeenEventId StateEvent tx
e
(EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
-> m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EventSource{m [StateEvent tx]
HasEventId (StateEvent tx) => m [StateEvent tx]
getEvents :: m [StateEvent tx]
$sel:getEvents:EventSource :: HasEventId (StateEvent tx) => m [StateEvent tx]
getEvents}, EventSink{StateEvent tx -> m ()
HasEventId (StateEvent tx) => StateEvent tx -> m ()
putEvent :: StateEvent tx -> m ()
$sel:putEvent:EventSink :: HasEventId (StateEvent tx) => StateEvent tx -> m ()
putEvent})
data PersistedStateChange tx
= Legacy (StateChanged tx)
| New (StateEvent tx)
deriving stock ((forall x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x)
-> (forall x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx)
-> Generic (PersistedStateChange tx)
forall x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
forall x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall tx x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
forall tx x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
$cfrom :: forall tx x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
from :: forall x.
PersistedStateChange tx -> Rep (PersistedStateChange tx) x
$cto :: forall tx x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
to :: forall x.
Rep (PersistedStateChange tx) x -> PersistedStateChange tx
Generic, Int -> PersistedStateChange tx -> ShowS
[PersistedStateChange tx] -> ShowS
PersistedStateChange tx -> String
(Int -> PersistedStateChange tx -> ShowS)
-> (PersistedStateChange tx -> String)
-> ([PersistedStateChange tx] -> ShowS)
-> Show (PersistedStateChange tx)
forall tx.
IsChainState tx =>
Int -> PersistedStateChange tx -> ShowS
forall tx. IsChainState tx => [PersistedStateChange tx] -> ShowS
forall tx. IsChainState tx => PersistedStateChange tx -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall tx.
IsChainState tx =>
Int -> PersistedStateChange tx -> ShowS
showsPrec :: Int -> PersistedStateChange tx -> ShowS
$cshow :: forall tx. IsChainState tx => PersistedStateChange tx -> String
show :: PersistedStateChange tx -> String
$cshowList :: forall tx. IsChainState tx => [PersistedStateChange tx] -> ShowS
showList :: [PersistedStateChange tx] -> ShowS
Show, PersistedStateChange tx -> PersistedStateChange tx -> Bool
(PersistedStateChange tx -> PersistedStateChange tx -> Bool)
-> (PersistedStateChange tx -> PersistedStateChange tx -> Bool)
-> Eq (PersistedStateChange tx)
forall tx.
IsChainState tx =>
PersistedStateChange tx -> PersistedStateChange tx -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall tx.
IsChainState tx =>
PersistedStateChange tx -> PersistedStateChange tx -> Bool
== :: PersistedStateChange tx -> PersistedStateChange tx -> Bool
$c/= :: forall tx.
IsChainState tx =>
PersistedStateChange tx -> PersistedStateChange tx -> Bool
/= :: PersistedStateChange tx -> PersistedStateChange tx -> Bool
Eq)
instance IsChainState tx => ToJSON (PersistedStateChange tx) where
toJSON :: PersistedStateChange tx -> Value
toJSON = \case
Legacy StateChanged tx
sc -> StateChanged tx -> Value
forall a. ToJSON a => a -> Value
toJSON StateChanged tx
sc
New StateEvent tx
e -> StateEvent tx -> Value
forall a. ToJSON a => a -> Value
toJSON StateEvent tx
e
instance IsChainState tx => FromJSON (PersistedStateChange tx) where
parseJSON :: Value -> Parser (PersistedStateChange tx)
parseJSON Value
v =
StateEvent tx -> PersistedStateChange tx
forall tx. StateEvent tx -> PersistedStateChange tx
New (StateEvent tx -> PersistedStateChange tx)
-> Parser (StateEvent tx) -> Parser (PersistedStateChange tx)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Value -> Parser (StateEvent tx)
forall a. FromJSON a => Value -> Parser a
parseJSON Value
v
Parser (PersistedStateChange tx)
-> Parser (PersistedStateChange tx)
-> Parser (PersistedStateChange tx)
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> StateChanged tx -> PersistedStateChange tx
forall tx. StateChanged tx -> PersistedStateChange tx
Legacy (StateChanged tx -> PersistedStateChange tx)
-> Parser (StateChanged tx) -> Parser (PersistedStateChange tx)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Value -> Parser (StateChanged tx)
forall a. FromJSON a => Value -> Parser a
parseJSON Value
v