-- | Handles to save/load files across the hydra-node. We use a simple JSON
-- encoding and two modes of operation to store things: Full and Incremental.
module Hydra.Persistence where

import Hydra.Prelude

import Conduit (
  ConduitT,
  MonadUnliftIO,
  ResourceT,
  linesUnboundedAsciiC,
  mapMC,
  runResourceT,
  sourceFileBS,
  sourceToList,
  (.|),
 )
import Control.Monad.Trans.Resource (allocate)
import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath (takeDirectory)
import UnliftIO (MVar, newMVar, putMVar, takeMVar, withMVar)
import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic)

newtype PersistenceException
  = PersistenceException String
  deriving stock (PersistenceException -> PersistenceException -> Bool
(PersistenceException -> PersistenceException -> Bool)
-> (PersistenceException -> PersistenceException -> Bool)
-> Eq PersistenceException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PersistenceException -> PersistenceException -> Bool
== :: PersistenceException -> PersistenceException -> Bool
$c/= :: PersistenceException -> PersistenceException -> Bool
/= :: PersistenceException -> PersistenceException -> Bool
Eq, Int -> PersistenceException -> ShowS
[PersistenceException] -> ShowS
PersistenceException -> String
(Int -> PersistenceException -> ShowS)
-> (PersistenceException -> String)
-> ([PersistenceException] -> ShowS)
-> Show PersistenceException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PersistenceException -> ShowS
showsPrec :: Int -> PersistenceException -> ShowS
$cshow :: PersistenceException -> String
show :: PersistenceException -> String
$cshowList :: [PersistenceException] -> ShowS
showList :: [PersistenceException] -> ShowS
Show)

instance Exception PersistenceException

-- | Handle to save and load files to/from disk using JSON encoding.
data Persistence a m = Persistence
  { forall a (m :: * -> *). Persistence a m -> ToJSON a => a -> m ()
save :: ToJSON a => a -> m ()
  , forall a (m :: * -> *).
Persistence a m -> FromJSON a => m (Maybe a)
load :: FromJSON a => m (Maybe a)
  }

-- | Initialize persistence handle for given type 'a' at given file path.
createPersistence ::
  (MonadIO m, MonadThrow m) =>
  FilePath ->
  m (Persistence a m)
createPersistence :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
String -> m (Persistence a m)
createPersistence String
fp = do
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (String -> IO ()) -> String -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> String -> IO ()
createDirectoryIfMissing Bool
True (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ ShowS
takeDirectory String
fp
  Persistence a m -> m (Persistence a m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Persistence a m -> m (Persistence a m))
-> Persistence a m -> m (Persistence a m)
forall a b. (a -> b) -> a -> b
$
    Persistence
      { $sel:save:Persistence :: ToJSON a => a -> m ()
save = \a
a -> do
          String -> ByteString -> m ()
forall (m :: * -> *). MonadIO m => String -> ByteString -> m ()
writeBinaryFileDurableAtomic String
fp (ByteString -> m ())
-> (ByteString -> ByteString) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
forall l s. LazyStrict l s => l -> s
toStrict (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode a
a
      , $sel:load:Persistence :: FromJSON a => m (Maybe a)
load =
          IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (String -> IO Bool
doesFileExist String
fp) m Bool -> (Bool -> m (Maybe a)) -> m (Maybe a)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Bool
False -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
            Bool
True -> do
              ByteString
bs <- String -> m ByteString
forall (m :: * -> *). MonadIO m => String -> m ByteString
readFileBS String
fp
              if ByteString -> Bool
BS.null ByteString
bs
                then Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
                else case ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict' ByteString
bs of
                  Left String
e -> PersistenceException -> m (Maybe a)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (PersistenceException -> m (Maybe a))
-> PersistenceException -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ String -> PersistenceException
PersistenceException String
e
                  Right a
a -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
      }

-- | Handle to save incrementally and load files to/from disk using JSON encoding.
data PersistenceIncremental a m = PersistenceIncremental
  { forall a (m :: * -> *).
PersistenceIncremental a m -> ToJSON a => a -> m ()
append :: ToJSON a => a -> m ()
  , forall a (m :: * -> *).
PersistenceIncremental a m
-> FromJSON a => ConduitT () a (ResourceT m) ()
source :: FromJSON a => ConduitT () a (ResourceT m) ()
  -- ^ Stream all elements from the file.
  }

-- | Load all elements from persistence into a list.
-- XXX: Deprecate this to avoid large memory usage.
loadAll :: (FromJSON a, MonadUnliftIO m) => PersistenceIncremental a m -> m [a]
loadAll :: forall a (m :: * -> *).
(FromJSON a, MonadUnliftIO m) =>
PersistenceIncremental a m -> m [a]
loadAll PersistenceIncremental{FromJSON a => ConduitT () a (ResourceT m) ()
$sel:source:PersistenceIncremental :: forall a (m :: * -> *).
PersistenceIncremental a m
-> FromJSON a => ConduitT () a (ResourceT m) ()
source :: FromJSON a => ConduitT () a (ResourceT m) ()
source} =
  ResourceT m [a] -> m [a]
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT m [a] -> m [a]) -> ResourceT m [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ ConduitT () a (ResourceT m) () -> ResourceT m [a]
forall (m :: * -> *) a. Monad m => ConduitT () a m () -> m [a]
sourceToList ConduitT () a (ResourceT m) ()
FromJSON a => ConduitT () a (ResourceT m) ()
source

-- | Initialize persistence handle for given type 'a' at given file path.
--
-- This instance of `PersistenceIncremental` is "thread-safe" in the sense that
-- it prevents appending while a source is still running (while ResourceT is
-- still not fully unwrapped.
createPersistenceIncremental ::
  forall a m.
  ( MonadUnliftIO m
  , MonadThrow m
  , FromJSON a
  ) =>
  FilePath ->
  m (PersistenceIncremental a m)
createPersistenceIncremental :: forall a (m :: * -> *).
(MonadUnliftIO m, MonadThrow m, FromJSON a) =>
String -> m (PersistenceIncremental a m)
createPersistenceIncremental String
fp = do
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (String -> IO ()) -> String -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> String -> IO ()
createDirectoryIfMissing Bool
True (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ ShowS
takeDirectory String
fp
  MVar ()
mutex <- () -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => a -> m (MVar a)
newMVar ()
  PersistenceIncremental a m -> m (PersistenceIncremental a m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PersistenceIncremental a m -> m (PersistenceIncremental a m))
-> PersistenceIncremental a m -> m (PersistenceIncremental a m)
forall a b. (a -> b) -> a -> b
$
    PersistenceIncremental
      { $sel:append:PersistenceIncremental :: ToJSON a => a -> m ()
append = \a
a ->
          MVar () -> (() -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar ()
mutex ((() -> m ()) -> m ()) -> (() -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \()
_ ->
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              let bytes :: ByteString
bytes = ByteString -> ByteString
forall l s. LazyStrict l s => l -> s
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode a
a ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\n"
              String -> IOMode -> (Handle -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
String -> IOMode -> (Handle -> m a) -> m a
withBinaryFile String
fp IOMode
AppendMode (Handle -> ByteString -> IO ()
`BS.hPut` ByteString
bytes)
      , $sel:source:PersistenceIncremental :: FromJSON a => ConduitT () a (ResourceT m) ()
source = MVar () -> ConduitT () a (ResourceT m) ()
forall i. MVar () -> ConduitT i a (ResourceT m) ()
source MVar ()
mutex
      }
 where
  source :: forall i. MVar () -> ConduitT i a (ResourceT m) ()
  source :: forall i. MVar () -> ConduitT i a (ResourceT m) ()
source MVar ()
mutex = do
    IO Bool -> ConduitT i a (ResourceT m) Bool
forall a. IO a -> ConduitT i a (ResourceT m) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (String -> IO Bool
doesFileExist String
fp) ConduitT i a (ResourceT m) Bool
-> (Bool -> ConduitT i a (ResourceT m) ())
-> ConduitT i a (ResourceT m) ()
forall a b.
ConduitT i a (ResourceT m) a
-> (a -> ConduitT i a (ResourceT m) b)
-> ConduitT i a (ResourceT m) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Bool
False -> () -> ConduitT i a (ResourceT m) ()
forall a. a -> ConduitT i a (ResourceT m) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Bool
True -> do
        -- NOTE: Here we take the mutex which will be automatically released
        -- upon running the 'ResourceT' for example when calling runConduitRes.
        ConduitT i a (ResourceT m) (ReleaseKey, ())
-> ConduitT i a (ResourceT m) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ConduitT i a (ResourceT m) (ReleaseKey, ())
 -> ConduitT i a (ResourceT m) ())
-> ConduitT i a (ResourceT m) (ReleaseKey, ())
-> ConduitT i a (ResourceT m) ()
forall a b. (a -> b) -> a -> b
$ IO ()
-> (() -> IO ()) -> ConduitT i a (ResourceT m) (ReleaseKey, ())
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
mutex) (MVar () -> () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar ()
mutex)
        -- NOTE: Read, decode and yield values line by line.
        String -> ConduitT i ByteString (ResourceT m) ()
forall (m :: * -> *) i.
MonadResource m =>
String -> ConduitT i ByteString m ()
sourceFileBS String
fp
          ConduitT i ByteString (ResourceT m) ()
-> ConduitT ByteString a (ResourceT m) ()
-> ConduitT i a (ResourceT m) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString ByteString (ResourceT m) ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq, Element seq ~ Word8) =>
ConduitT seq seq m ()
linesUnboundedAsciiC
          ConduitT ByteString ByteString (ResourceT m) ()
-> ConduitT ByteString a (ResourceT m) ()
-> ConduitT ByteString a (ResourceT m) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (ByteString -> ResourceT m a)
-> ConduitT ByteString a (ResourceT m) ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
mapMC
            ( \ByteString
bs ->
                case ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict' ByteString
bs of
                  Left String
e ->
                    m a -> ResourceT m a
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> ResourceT m a)
-> (PersistenceException -> m a)
-> PersistenceException
-> ResourceT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PersistenceException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (PersistenceException -> ResourceT m a)
-> PersistenceException -> ResourceT m a
forall a b. (a -> b) -> a -> b
$
                      String -> PersistenceException
PersistenceException (String -> PersistenceException) -> String -> PersistenceException
forall a b. (a -> b) -> a -> b
$
                        String
"Error when decoding from file " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
fp String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
": " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ShowS
forall b a. (Show a, IsString b) => a -> b
show String
e String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall b a. (Show a, IsString b) => a -> b
show ByteString
bs
                  Right a
decoded -> a -> ResourceT m a
forall a. a -> ResourceT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
decoded
            )