{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -Wno-deferred-out-of-scope-variables #-}

-- | Implements a Hydra network component using [etcd](https://etcd.io/).
--
-- While implementing a basic broadcast protocol over a distributed key-value
-- store is quite an overkill, the [Raft](https://raft.github.io/) consensus of
-- etcd provides our application with a crash-recovery fault-tolerant "atomic
-- broadcast" out-of-the-box. As a nice side-effect, the network layer becomes
-- very introspectable, while it would also support features like TLS or service
-- discovery.
--
-- The component installs, starts and configures an `etcd` instance and connects
-- to it using a GRPC client. We can only write and read from the cluster while
-- connected to the majority cluster.
--
-- Broadcasting is implemented using @put@ to some well-known key, while message
-- delivery is done by using @watch@ on the same 'msg' prefix. We keep a last known
-- revision, also stored on disk, to start 'watch' with that revision (+1) and
-- only deliver messages that were not seen before. In case we are not connected
-- to our 'etcd' instance or not enough peers (= on a minority cluster), we
-- retry sending, but also store messages to broadcast in a 'PersistentQueue',
-- which makes the node resilient against crashes while sending. TODO: Is this
-- needed? performance limitation?
--
-- Connectivity and compatibility with other nodes on the cluster is tracked
-- using the key-value service as well:
--
--   * network connectivity is determined by being able to fetch the member list
--   * peer connectivity is tracked (best effort, not authorized) using an entry
--     at 'alive-\<advertise\>' keys with individual leases and repeated keep-alives
--   * each node compare-and-swaps its `version` into a key of the same name to
--     check compatibility (not updatable)
--
-- Note that the etcd cluster is configured to compact revisions down to 1000
-- every 5 minutes. This prevents infinite growth of the key-value store, but
-- also limits how long a node can be disconnected without missing out. 1000
-- should be more than enough for our use-case as the Hydra protocol will not
-- advance unless all participants are present.
module Hydra.Network.Etcd where

import Hydra.Prelude

import Cardano.Binary (decodeFull', serialize')
import Cardano.Crypto.Hash (SHA256, hashToStringAsHex, hashWithSerialiser)
import Control.Concurrent.Class.MonadSTM (
  modifyTVar',
  newTBQueueIO,
  newTVarIO,
  peekTBQueue,
  readTBQueue,
  swapTVar,
  writeTBQueue,
  writeTVar,
 )
import Control.Exception (IOException)
import Control.Lens ((^.), (^..), (^?))
import Data.Aeson (decodeFileStrict', encodeFile)
import Data.Aeson qualified as Aeson
import Data.Aeson.Lens qualified as Aeson
import Data.Aeson.Types (Value)
import Data.Bits ((.|.))
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8
import Data.List ((\\))
import Data.List qualified as List
import Data.Map qualified as Map
import Data.Text qualified as T
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (
  Connectivity (..),
  Host (..),
  Network (..),
  NetworkCallback (..),
  NetworkComponent,
  NetworkConfiguration (..),
  ProtocolVersion,
  WhichEtcd (..),
 )
import Hydra.Node.EmbedTH (embedExecutable)
import Network.GRPC.Client (
  Address (..),
  ConnParams (..),
  Connection,
  ReconnectPolicy (..),
  ReconnectTo (ReconnectToOriginal),
  Server (..),
  rpc,
  withConnection,
 )
import Network.GRPC.Client.StreamType.IO (biDiStreaming, nonStreaming)
import Network.GRPC.Common (GrpcError (..), GrpcException (..), HTTP2Settings (..), NextElem (..), def, defaultHTTP2Settings)
import Network.GRPC.Common.NextElem (whileNext_)
import Network.GRPC.Common.Protobuf (Proto (..), Protobuf, defMessage, (.~))
import Network.GRPC.Etcd (
  Compare'CompareResult (..),
  Compare'CompareTarget (..),
  KV,
  Lease,
  Watch,
 )
import System.Directory (createDirectoryIfMissing, listDirectory, removeFile)
import System.Environment.Blank (getEnvironment)
import System.FilePath (takeDirectory, (</>))
import System.IO.Error (isDoesNotExistError)
import System.Posix (ownerExecuteMode, ownerReadMode, ownerWriteMode, setFileMode)
import System.Process (interruptProcessGroupOf)
import System.Process.Typed (
  Process,
  ProcessConfig,
  createPipe,
  getStderr,
  proc,
  setCreateGroup,
  setEnv,
  setStderr,
  startProcess,
  stopProcess,
  unsafeProcessHandle,
  waitExitCode,
 )
import UnliftIO (readTVarIO)

-- | Concrete network component that broadcasts messages to an etcd cluster and
-- listens for incoming messages.
withEtcdNetwork ::
  (ToCBOR msg, FromCBOR msg, Eq msg) =>
  Tracer IO EtcdLog ->
  ProtocolVersion ->
  -- TODO: check if all of these needed?
  NetworkConfiguration ->
  NetworkComponent IO msg msg ()
withEtcdNetwork :: forall msg.
(ToCBOR msg, FromCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> ProtocolVersion
-> NetworkConfiguration
-> NetworkComponent IO msg msg ()
withEtcdNetwork Tracer IO EtcdLog
tracer ProtocolVersion
protocolVersion NetworkConfiguration
config NetworkCallback msg IO
callback Network IO msg -> IO ()
action = do
  FilePath
etcdBinPath <- FilePath -> WhichEtcd -> IO FilePath
getEtcdBinary FilePath
persistenceDir WhichEtcd
whichEtcd
  -- TODO: fail if cluster config / members do not match --peer
  -- configuration? That would be similar to the 'acks' persistence
  -- bailing out on loading.
  Map FilePath FilePath
envVars <- [(FilePath, FilePath)] -> Map FilePath FilePath
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(FilePath, FilePath)] -> Map FilePath FilePath)
-> IO [(FilePath, FilePath)] -> IO (Map FilePath FilePath)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO [(FilePath, FilePath)]
getEnvironment
  ProcessConfig () () Handle
-> (Process () () Handle -> IO ()) -> IO ()
forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt (FilePath -> Map FilePath FilePath -> ProcessConfig () () Handle
etcdCmd FilePath
etcdBinPath Map FilePath FilePath
envVars) ((Process () () Handle -> IO ()) -> IO ())
-> (Process () () Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Process () () Handle
p -> do
    IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process () () Handle
p IO ExitCode -> (ExitCode -> IO Any) -> IO Any
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ExitCode
ec -> FilePath -> IO Any
forall a. FilePath -> IO a
forall (m :: * -> *) a. MonadFail m => FilePath -> m a
fail (FilePath -> IO Any) -> FilePath -> IO Any
forall a b. (a -> b) -> a -> b
$ FilePath
"Sub-process etcd exited with: " FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> ExitCode -> FilePath
forall b a. (Show a, IsString b) => a -> b
show ExitCode
ec) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> NetworkCallback msg IO -> IO Any
traceStderr Process () () Handle
p NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- XXX: cleanup reconnecting through policy if other threads fail
        TVar Bool
doneVar <- Bool -> IO (TVar IO Bool)
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Bool
False
        -- NOTE: The connection to the server is set up asynchronously; the
        -- first rpc call will block until the connection has been established.
        ConnParams -> Server -> (Connection -> IO ()) -> IO ()
forall a. ConnParams -> Server -> (Connection -> IO a) -> IO a
withConnection (TVar Bool -> ConnParams
connParams TVar Bool
doneVar) Server
grpcServer ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
          -- REVIEW: checkVersion blocks if used on main thread - why?
          IO () -> (Async IO () -> IO ()) -> IO ()
forall a b. IO a -> (Async IO a -> IO b) -> IO b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
protocolVersion NetworkCallback msg IO
callback) ((Async IO () -> IO ()) -> IO ())
-> (Async IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async IO ()
_ -> do
            IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
              IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn FilePath
persistenceDir NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                PersistentQueue IO msg
queue <- FilePath -> Natural -> IO (PersistentQueue IO msg)
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
FilePath -> Natural -> m (PersistentQueue m a)
newPersistentQueue (FilePath
persistenceDir FilePath -> FilePath -> FilePath
</> FilePath
"pending-broadcast") Natural
100
                IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
advertise PersistentQueue IO msg
queue) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  Network IO msg -> IO ()
action
                    Network
                      { $sel:broadcast:Network :: msg -> IO ()
broadcast = PersistentQueue IO msg -> msg -> IO ()
forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue IO msg
queue
                      }
                  STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TVar IO Bool -> Bool -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar Bool
TVar IO Bool
doneVar Bool
True)
 where
  connParams :: TVar Bool -> ConnParams
connParams TVar Bool
doneVar =
    ConnParams
forall a. Default a => a
def
      { connReconnectPolicy = reconnectPolicy doneVar
      , -- NOTE: Not rate limit pings to our trusted, local etcd node. See
        -- comment on 'http2OverridePingRateLimit'.
        connHTTP2Settings = defaultHTTP2Settings{http2OverridePingRateLimit = Just maxBound}
      }

  reconnectPolicy :: TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar = ReconnectTo -> IO ReconnectPolicy -> ReconnectPolicy
ReconnectAfter ReconnectTo
ReconnectToOriginal (IO ReconnectPolicy -> ReconnectPolicy)
-> IO ReconnectPolicy -> ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ do
    Bool
done <- TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
doneVar
    if Bool
done
      then ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReconnectPolicy
DontReconnect
      else do
        DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
        Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
Reconnecting
        ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReconnectPolicy -> IO ReconnectPolicy)
-> ReconnectPolicy -> IO ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar

  clientHost :: Host
clientHost = Host{$sel:hostname:Host :: Text
hostname = Text
"127.0.0.1", $sel:port:Host :: PortNumber
port = PortNumber
clientPort}

  grpcServer :: Server
grpcServer =
    Address -> Server
ServerInsecure (Address -> Server) -> Address -> Server
forall a b. (a -> b) -> a -> b
$
      Address
        { addressHost :: FilePath
addressHost = Text -> FilePath
forall a. ToString a => a -> FilePath
toString (Text -> FilePath) -> Text -> FilePath
forall a b. (a -> b) -> a -> b
$ Host -> Text
hostname Host
clientHost
        , addressPort :: PortNumber
addressPort = Host -> PortNumber
port Host
clientHost
        , addressAuthority :: Maybe FilePath
addressAuthority = Maybe FilePath
forall a. Maybe a
Nothing
        }

  -- NOTE: Offset client port by the same amount as configured 'port' is offset
  -- from the default '5001'. This will result in the default client port 2379
  -- be used by default still.
  clientPort :: PortNumber
clientPort = PortNumber
2379 PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
+ Host -> PortNumber
port Host
listen PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
- PortNumber
5001

  traceStderr :: Process () () Handle -> NetworkCallback msg IO -> IO Any
traceStderr Process () () Handle
p NetworkCallback{Connectivity -> IO ()
onConnectivity :: Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity} =
    IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
      ByteString
bs <- Handle -> IO ByteString
BS.hGetLine (Process () () Handle -> Handle
forall stdin stdout stderr. Process stdin stdout stderr -> stderr
getStderr Process () () Handle
p)
      case ByteString -> Either FilePath Value
forall a. FromJSON a => ByteString -> Either FilePath a
Aeson.eitherDecodeStrict ByteString
bs of
        Left FilePath
err -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer FailedToDecodeLog{$sel:log:EtcdLog :: Text
log = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 ByteString
bs, $sel:reason:EtcdLog :: Text
reason = FilePath -> Text
forall b a. (Show a, IsString b) => a -> b
show FilePath
err}
        Right Value
v -> do
          let expectedClusterMismatch :: Maybe (Value, Value)
expectedClusterMismatch = do
                Value
level' <- ByteString
bs ByteString -> Getting (First Value) ByteString Value -> Maybe Value
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' ByteString Value
forall t. AsValue t => Key -> Traversal' t Value
Aeson.key Key
"level" Getting (First Value) ByteString Value
-> ((Value -> Const (First Value) Value)
    -> Value -> Const (First Value) Value)
-> Getting (First Value) ByteString Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Value -> Const (First Value) Value)
-> Value -> Const (First Value) Value
Prism' Value Value
Aeson.nonNull
                Value
msg' <- ByteString
bs ByteString -> Getting (First Value) ByteString Value -> Maybe Value
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' ByteString Value
forall t. AsValue t => Key -> Traversal' t Value
Aeson.key Key
"msg" Getting (First Value) ByteString Value
-> ((Value -> Const (First Value) Value)
    -> Value -> Const (First Value) Value)
-> Getting (First Value) ByteString Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Value -> Const (First Value) Value)
-> Value -> Const (First Value) Value
Prism' Value Value
Aeson.nonNull
                (Value, Value) -> Maybe (Value, Value)
forall a. a -> Maybe a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Value
level', Value
msg')
          case Maybe (Value, Value)
expectedClusterMismatch of
            Just (Aeson.String Text
"error", Aeson.String Text
"request sent was ignored due to cluster ID mismatch") ->
              Connectivity -> IO ()
onConnectivity ClusterIDMismatch{$sel:clusterPeers:PeerConnected :: Text
clusterPeers = FilePath -> Text
T.pack FilePath
clusterPeers}
            Maybe (Value, Value)
_ -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ EtcdLog{etcd :: Value
etcd = Value
v}

  -- XXX: Could use TLS to secure peer connections
  -- XXX: Could use discovery to simplify configuration
  -- NOTE: Configured using guides: https://etcd.io/docs/v3.5/op-guide
  etcdCmd :: FilePath -> Map FilePath FilePath -> ProcessConfig () () Handle
etcdCmd FilePath
etcdBinPath Map FilePath FilePath
envVars =
    -- NOTE: We map prefers the left; so we need to mappend default at the end.
    [(FilePath, FilePath)]
-> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
[(FilePath, FilePath)]
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setEnv (Map FilePath FilePath -> [(FilePath, FilePath)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map FilePath FilePath -> [(FilePath, FilePath)])
-> Map FilePath FilePath -> [(FilePath, FilePath)]
forall a b. (a -> b) -> a -> b
$ Map FilePath FilePath
envVars Map FilePath FilePath
-> Map FilePath FilePath -> Map FilePath FilePath
forall a. Semigroup a => a -> a -> a
<> Map FilePath FilePath
defaultEnv)
      (ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([FilePath] -> ProcessConfig () () Handle)
-> [FilePath]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True -- Prevents interrupt of main process when we send SIGINT to etcd
      (ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([FilePath] -> ProcessConfig () () Handle)
-> [FilePath]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamSpec 'STOutput Handle
-> ProcessConfig () () () -> ProcessConfig () () Handle
forall stderr stdin stdout stderr0.
StreamSpec 'STOutput stderr
-> ProcessConfig stdin stdout stderr0
-> ProcessConfig stdin stdout stderr
setStderr StreamSpec 'STOutput Handle
forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
      (ProcessConfig () () () -> ProcessConfig () () Handle)
-> ([FilePath] -> ProcessConfig () () ())
-> [FilePath]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> [FilePath] -> ProcessConfig () () ()
proc FilePath
etcdBinPath
      ([FilePath] -> ProcessConfig () () Handle)
-> [FilePath] -> ProcessConfig () () Handle
forall a b. (a -> b) -> a -> b
$ [[FilePath]] -> [FilePath]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
        [ -- NOTE: Must be used in clusterPeers
          [FilePath
"--name", Host -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Host
advertise]
        , [FilePath
"--data-dir", FilePath
persistenceDir FilePath -> FilePath -> FilePath
</> FilePath
"etcd" FilePath -> FilePath -> FilePath
</> Hash SHA256 ByteString -> FilePath
forall h a. Hash h a -> FilePath
hashToStringAsHex (forall h a. HashAlgorithm h => (a -> Encoding) -> a -> Hash h a
hashWithSerialiser @SHA256 ByteString -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR (ByteString -> Hash SHA256 ByteString)
-> ByteString -> Hash SHA256 ByteString
forall a b. (a -> b) -> a -> b
$ FilePath -> ByteString
BS8.pack FilePath
clusterPeers)]
        , [FilePath
"--listen-peer-urls", Host -> FilePath
httpUrl Host
listen]
        , [FilePath
"--initial-advertise-peer-urls", Host -> FilePath
httpUrl Host
advertise]
        , [FilePath
"--listen-client-urls", Host -> FilePath
httpUrl Host
clientHost]
        , -- Pick a random port for http api (and use above only for grpc)
          [FilePath
"--listen-client-http-urls", FilePath
"http://localhost:0"]
        , -- Client access only on configured 'host' interface.
          [FilePath
"--advertise-client-urls", Host -> FilePath
httpUrl Host
clientHost]
        , -- XXX: Could use unique initial-cluster-tokens to isolate clusters
          [FilePath
"--initial-cluster-token", FilePath
"hydra-network-1"]
        , [FilePath
"--initial-cluster", FilePath
clusterPeers]
        ]

  defaultEnv :: Map.Map String String
  defaultEnv :: Map FilePath FilePath
defaultEnv =
    -- Keep up to 1000 revisions. See also:
    -- https://etcd.io/docs/v3.5/op-guide/maintenance/#auto-compaction
    [(FilePath, FilePath)] -> Map FilePath FilePath
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
      [ (FilePath
"ETCD_AUTO_COMPACTION_MODE", FilePath
"revision")
      , (FilePath
"ETCD_AUTO_COMPACTION_RETENTION", FilePath
"1000")
      ]

  -- NOTE: Building a canonical list of labels from the advertised addresses
  clusterPeers :: FilePath
clusterPeers =
    FilePath -> [FilePath] -> FilePath
forall a. [a] -> [[a]] -> [a]
intercalate FilePath
","
      ([FilePath] -> FilePath)
-> ([Host] -> [FilePath]) -> [Host] -> FilePath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Host -> FilePath) -> [Host] -> [FilePath]
forall a b. (a -> b) -> [a] -> [b]
map (\Host
h -> Host -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Host
h FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
"=" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> Host -> FilePath
httpUrl Host
h)
      ([Host] -> FilePath) -> [Host] -> FilePath
forall a b. (a -> b) -> a -> b
$ (Host
advertise Host -> [Host] -> [Host]
forall a. a -> [a] -> [a]
: [Host]
peers)

  httpUrl :: Host -> FilePath
httpUrl (Host Text
h PortNumber
p) = FilePath
"http://" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> Text -> FilePath
forall a. ToString a => a -> FilePath
toString Text
h FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
":" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> PortNumber -> FilePath
forall b a. (Show a, IsString b) => a -> b
show PortNumber
p

  NetworkConfiguration{FilePath
persistenceDir :: FilePath
$sel:persistenceDir:NetworkConfiguration :: NetworkConfiguration -> FilePath
persistenceDir, Host
listen :: Host
$sel:listen:NetworkConfiguration :: NetworkConfiguration -> Host
listen, Host
advertise :: Host
$sel:advertise:NetworkConfiguration :: NetworkConfiguration -> Host
advertise, [Host]
peers :: [Host]
$sel:peers:NetworkConfiguration :: NetworkConfiguration -> [Host]
peers, WhichEtcd
whichEtcd :: WhichEtcd
$sel:whichEtcd:NetworkConfiguration :: NetworkConfiguration -> WhichEtcd
whichEtcd} = NetworkConfiguration
config

-- | Return the path of the etcd binary. Will either install it first, or just
-- assume there is one available on the system path.
getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath
getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath
getEtcdBinary FilePath
_ WhichEtcd
SystemEtcd = FilePath -> IO FilePath
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
"etcd"
getEtcdBinary FilePath
persistenceDir WhichEtcd
EmbeddedEtcd =
  let path :: FilePath
path = FilePath
persistenceDir FilePath -> FilePath -> FilePath
</> FilePath
"bin" FilePath -> FilePath -> FilePath
</> FilePath
"etcd"
   in FilePath -> IO ()
installEtcd FilePath
path IO () -> IO FilePath -> IO FilePath
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> FilePath -> IO FilePath
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
path

-- | Install the embedded 'etcd' binary to given file path.
installEtcd :: FilePath -> IO ()
installEtcd :: FilePath -> IO ()
installEtcd FilePath
fp = do
  Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True (FilePath -> FilePath
takeDirectory FilePath
fp)
  FilePath -> ByteString -> IO ()
BS.writeFile FilePath
fp $(embedExecutable "etcd")
  FilePath -> FileMode -> IO ()
setFileMode FilePath
fp (FileMode
ownerReadMode FileMode -> FileMode -> FileMode
forall a. Bits a => a -> a -> a
.|. FileMode
ownerWriteMode FileMode -> FileMode -> FileMode
forall a. Bits a => a -> a -> a
.|. FileMode
ownerExecuteMode)

-- | Check and write version on etcd cluster. This will retry until we are on a
-- majority cluster and succeed. If the version does not match a corresponding
-- 'Connectivity' message is sent via 'NetworkCallback'.
checkVersion ::
  Tracer IO EtcdLog ->
  Connection ->
  ProtocolVersion ->
  NetworkCallback msg IO ->
  IO ()
checkVersion :: forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
ourVersion NetworkCallback{Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity :: Connectivity -> IO ()
onConnectivity} = do
  -- Get or write our version into kv store
  Proto TxnResponse
res <-
    Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "txn")
-> Input (Protobuf KV "txn")
-> IO (Output (Protobuf KV "txn"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "txn")) (Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn")))
-> Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn"))
forall a b. (a -> b) -> a -> b
$
      Proto TxnRequest
forall msg. Message msg => msg
defMessage
        Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto Compare]
  [Proto Compare]
#compare ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto Compare]
  [Proto Compare]
-> [Proto Compare] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto Compare
versionExists]
        Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
#success ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
getVersion]
        Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
#failure ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
putVersion]

  -- Check version if version was already present
  if Proto TxnResponse
res Proto TxnResponse -> Getting Bool (Proto TxnResponse) Bool -> Bool
forall s a. s -> Getting a s a -> a
^. Getting Bool (Proto TxnResponse) Bool
#succeeded
    then [Proto KeyValue] -> (Proto KeyValue -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto TxnResponse
res Proto TxnResponse
-> Getting
     (Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto ResponseOp]
 -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Proto TxnResponse
-> Const (Endo [Proto KeyValue]) (Proto TxnResponse)
#responses (([Proto ResponseOp]
  -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
 -> Proto TxnResponse
 -> Const (Endo [Proto KeyValue]) (Proto TxnResponse))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> [Proto ResponseOp]
    -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Getting
     (Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto ResponseOp
 -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ((Proto ResponseOp
  -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
 -> [Proto ResponseOp]
 -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> Proto ResponseOp
    -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto RangeResponse
 -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
#responseRange ((Proto RangeResponse
  -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
 -> Proto ResponseOp
 -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> Proto RangeResponse
    -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([Proto KeyValue]
 -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
  -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
 -> Proto RangeResponse
 -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> [Proto KeyValue]
    -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO ()) -> IO ())
-> (Proto KeyValue -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv ->
      case ByteString -> Either DecoderError ProtocolVersion
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' (ByteString -> Either DecoderError ProtocolVersion)
-> ByteString -> Either DecoderError ProtocolVersion
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value of
        Left DecoderError
err -> do
          Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$
            FailedToDecodeValue
              { $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
              , $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
              , $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
              }
          Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = Maybe ProtocolVersion
forall a. Maybe a
Nothing}
        Right ProtocolVersion
theirVersion ->
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ProtocolVersion
theirVersion ProtocolVersion -> ProtocolVersion -> Bool
forall a. Eq a => a -> a -> Bool
== ProtocolVersion
ourVersion) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = ProtocolVersion -> Maybe ProtocolVersion
forall a. a -> Maybe a
Just ProtocolVersion
theirVersion}
    else
      Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ MatchingProtocolVersion{version :: ProtocolVersion
version = ProtocolVersion
ourVersion}
 where
  versionKey :: ByteString
versionKey = ByteString
"version"

  -- exists = create_revision of key 'version' > 0
  versionExists :: Proto Compare
versionExists =
    Proto Compare
forall msg. Message msg => msg
defMessage
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareResult)
  (Proto Compare'CompareResult)
#result ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareResult)
  (Proto Compare'CompareResult)
-> Proto Compare'CompareResult -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareResult -> Proto Compare'CompareResult
forall msg. msg -> Proto msg
Proto Compare'CompareResult
Compare'GREATER
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareTarget)
  (Proto Compare'CompareTarget)
#target ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareTarget)
  (Proto Compare'CompareTarget)
-> Proto Compare'CompareTarget -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareTarget -> Proto Compare'CompareTarget
forall msg. msg -> Proto msg
Proto Compare'CompareTarget
Compare'VERSION
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) ByteString ByteString
#key ASetter (Proto Compare) (Proto Compare) ByteString ByteString
-> ByteString -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) Int64 Int64
#version ASetter (Proto Compare) (Proto Compare) Int64 Int64
-> Int64 -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
0

  getVersion :: Proto RequestOp
getVersion =
    Proto RequestOp
forall msg. Message msg => msg
defMessage Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto RangeRequest)
  (Proto RangeRequest)
#requestRange ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto RangeRequest)
  (Proto RangeRequest)
-> Proto RangeRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ (Proto RangeRequest
forall msg. Message msg => msg
defMessage Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey)

  putVersion :: Proto RequestOp
putVersion =
    Proto RequestOp
forall msg. Message msg => msg
defMessage
      Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto PutRequest)
  (Proto PutRequest)
#requestPut
        ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto PutRequest)
  (Proto PutRequest)
-> Proto PutRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ( Proto PutRequest
forall msg. Message msg => msg
defMessage
              Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
              Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ProtocolVersion -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' ProtocolVersion
ourVersion
           )

-- | Broadcast messages from a queue to the etcd cluster.
--
-- TODO: retrying on failure even needed?
-- Retries on failure to 'putMessage' in case we are on a minority cluster.
broadcastMessages ::
  (ToCBOR msg, Eq msg) =>
  Tracer IO EtcdLog ->
  Connection ->
  -- | Used to identify sender.
  Host ->
  PersistentQueue IO msg ->
  IO ()
broadcastMessages :: forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
ourHost PersistentQueue IO msg
queue =
  Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"broadcastMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    msg
msg <- PersistentQueue IO msg -> IO msg
forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue IO msg
queue
    (Connection -> Host -> msg -> IO ()
forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PersistentQueue IO msg -> msg -> IO ()
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue IO msg
queue msg
msg)
      IO () -> (GrpcException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \case
        GrpcException{GrpcError
grpcError :: GrpcError
grpcError :: GrpcException -> GrpcError
grpcError, Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage}
          | GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded -> do
              Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ BroadcastFailed{$sel:reason:EtcdLog :: Text
reason = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
"unknown" Maybe Text
grpcErrorMessage}
              DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
        GrpcException
e -> GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e

-- | Broadcast a message to the etcd cluster.
putMessage ::
  ToCBOR msg =>
  Connection ->
  -- | Used to identify sender.
  Host ->
  msg ->
  IO ()
putMessage :: forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg =
  IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> IO (Proto PutResponse) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) Input (Protobuf KV "put")
Proto PutRequest
req
 where
  req :: Proto PutRequest
req =
    Proto PutRequest
forall msg. Message msg => msg
defMessage
      Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
key
      Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' msg
msg

  key :: ByteString
key = forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 @Text (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
"msg-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Host -> Text
forall b a. (Show a, IsString b) => a -> b
show Host
ourHost

-- | Fetch and wait for messages from the etcd cluster.
waitMessages ::
  FromCBOR msg =>
  Tracer IO EtcdLog ->
  Connection ->
  FilePath ->
  NetworkCallback msg IO ->
  IO ()
waitMessages :: forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn FilePath
directory NetworkCallback{msg -> IO ()
deliver :: msg -> IO ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} = do
  Natural
revision <- FilePath -> IO Natural
forall (m :: * -> *). MonadIO m => FilePath -> m Natural
getLastKnownRevision FilePath
directory
  Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"waitMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- NOTE: We have not observed the watch (subscription) fail even when peers
    -- leave and we end up on a minority cluster.
    Connection
-> ClientHandler'
     'BiDiStreaming (ReaderT Connection IO) (Protobuf Watch "watch")
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
    -> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
    -> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Watch "watch")) (((NextElem (Input (Protobuf Watch "watch")) -> IO ())
  -> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
 -> IO ())
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
    -> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Watch "watch")) -> IO ()
send IO (NextElem (Output (Protobuf Watch "watch")))
recv -> do
      -- NOTE: Request all keys starting with 'msg'. See also section KeyRanges
      -- in https://etcd.io/docs/v3.5/learning/api/#key-value-api
      let watchRequest :: Proto WatchCreateRequest
watchRequest =
            Proto WatchCreateRequest
forall msg. Message msg => msg
defMessage
              Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
#key ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msg"
              Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
#rangeEnd ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msh" -- NOTE: g+1 to query prefixes
              Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
#startRevision ASetter
  (Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
-> Int64 -> Proto WatchCreateRequest -> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Natural -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural
revision Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
      NextElem (Input (Protobuf Watch "watch")) -> IO ()
NextElem (Proto WatchRequest) -> IO ()
send (NextElem (Proto WatchRequest) -> IO ())
-> (Proto WatchRequest -> NextElem (Proto WatchRequest))
-> Proto WatchRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proto WatchRequest -> NextElem (Proto WatchRequest)
forall a. a -> NextElem a
NextElem (Proto WatchRequest -> IO ()) -> Proto WatchRequest -> IO ()
forall a b. (a -> b) -> a -> b
$ Proto WatchRequest
forall msg. Message msg => msg
defMessage Proto WatchRequest
-> (Proto WatchRequest -> Proto WatchRequest) -> Proto WatchRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchRequest)
  (Proto WatchRequest)
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
#createRequest ASetter
  (Proto WatchRequest)
  (Proto WatchRequest)
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
-> Proto WatchCreateRequest
-> Proto WatchRequest
-> Proto WatchRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Proto WatchCreateRequest
watchRequest
      IO (NextElem (Proto WatchResponse))
-> (Proto WatchResponse -> IO ()) -> IO ()
forall (m :: * -> *) a.
Monad m =>
m (NextElem a) -> (a -> m ()) -> m ()
whileNext_ IO (NextElem (Output (Protobuf Watch "watch")))
IO (NextElem (Proto WatchResponse))
recv Proto WatchResponse -> IO ()
process
    -- Wait before re-trying
    DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
 where
  process :: Proto WatchResponse -> IO ()
process Proto WatchResponse
res = do
    let revision :: Natural
revision = Int64 -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Natural) -> Int64 -> Natural
forall a b. (a -> b) -> a -> b
$ Proto WatchResponse
res Proto WatchResponse
-> Getting Int64 (Proto WatchResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. (Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Proto WatchResponse -> Const Int64 (Proto WatchResponse)
#header ((Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
 -> Proto WatchResponse -> Const Int64 (Proto WatchResponse))
-> ((Int64 -> Const Int64 Int64)
    -> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Getting Int64 (Proto WatchResponse) Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64 -> Const Int64 Int64)
-> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader)
#revision
    FilePath -> Natural -> IO ()
forall (m :: * -> *). MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision FilePath
directory Natural
revision
    [Proto Event] -> (Proto Event -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto WatchResponse
res Proto WatchResponse
-> Getting [Proto Event] (Proto WatchResponse) [Proto Event]
-> [Proto Event]
forall s a. s -> Getting a s a -> a
^. Getting [Proto Event] (Proto WatchResponse) [Proto Event]
#events) ((Proto Event -> IO ()) -> IO ())
-> (Proto Event -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto Event
event -> do
      let value :: ByteString
value = Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
 -> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#value
      case ByteString -> Either DecoderError msg
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
        Left DecoderError
err ->
          Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
            Tracer IO EtcdLog
tracer
            FailedToDecodeValue
              { $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
 -> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#key
              , $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
              , $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
              }
        Right msg
msg -> msg -> IO ()
deliver msg
msg

getLastKnownRevision :: MonadIO m => FilePath -> m Natural
getLastKnownRevision :: forall (m :: * -> *). MonadIO m => FilePath -> m Natural
getLastKnownRevision FilePath
directory = do
  IO Natural -> m Natural
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Natural -> m Natural) -> IO Natural -> m Natural
forall a b. (a -> b) -> a -> b
$
    IO (Maybe Natural) -> IO (Either IOException (Maybe Natural))
forall e a. Exception e => IO a -> IO (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (FilePath -> IO (Maybe Natural)
forall a. FromJSON a => FilePath -> IO (Maybe a)
decodeFileStrict' (FilePath -> IO (Maybe Natural)) -> FilePath -> IO (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ FilePath
directory FilePath -> FilePath -> FilePath
</> FilePath
"last-known-revision") IO (Either IOException (Maybe Natural))
-> (Either IOException (Maybe Natural) -> IO Natural) -> IO Natural
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Right Maybe Natural
rev -> do
        Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> IO Natural) -> Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ Natural -> Maybe Natural -> Natural
forall a. a -> Maybe a -> a
fromMaybe Natural
1 Maybe Natural
rev
      Left (IOException
e :: IOException)
        | IOException -> Bool
isDoesNotExistError IOException
e -> Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
1
        | Bool
otherwise -> do
            FilePath -> IO Natural
forall a. FilePath -> IO a
forall (m :: * -> *) a. MonadFail m => FilePath -> m a
fail (FilePath -> IO Natural) -> FilePath -> IO Natural
forall a b. (a -> b) -> a -> b
$ FilePath
"Failed to load last known revision: " FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> IOException -> FilePath
forall b a. (Show a, IsString b) => a -> b
show IOException
e

putLastKnownRevision :: MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision :: forall (m :: * -> *). MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision FilePath
directory Natural
rev = do
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ FilePath -> Natural -> IO ()
forall a. ToJSON a => FilePath -> a -> IO ()
encodeFile (FilePath
directory FilePath -> FilePath -> FilePath
</> FilePath
"last-known-revision") Natural
rev

-- | Write a well-known key to indicate being alive, keep it alive using a lease
-- and poll other peers entries to yield connectivity events. While doing so,
-- overall network connectivity is determined from the ability to read/write to
-- the cluster.
pollConnectivity ::
  Tracer IO EtcdLog ->
  Connection ->
  -- | Local host
  Host ->
  NetworkCallback msg IO ->
  IO ()
pollConnectivity :: forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback{Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity :: Connectivity -> IO ()
onConnectivity} = do
  TVar [Host]
seenAliveVar <- [Host] -> IO (TVar IO [Host])
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO []
  Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"pollConnectivity" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (GrpcException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Int64
leaseId <- IO Int64
createLease
      -- If we can create a lease, we are connected
      Connectivity -> IO ()
onConnectivity Connectivity
NetworkConnected
      -- Write our alive key using lease
      Int64 -> IO ()
writeAlive Int64
leaseId
      Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer CreatedLease{Int64
leaseId :: Int64
$sel:leaseId:EtcdLog :: Int64
leaseId}
      Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId ((IO DiffTime -> IO Any) -> IO ())
-> (IO DiffTime -> IO Any) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO DiffTime
keepAlive ->
        IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
          -- Keep our lease alive
          DiffTime
ttlRemaining <- IO DiffTime
keepAlive
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DiffTime
ttlRemaining DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< DiffTime
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer LowLeaseTTL{DiffTime
ttlRemaining :: DiffTime
$sel:ttlRemaining:EtcdLog :: DiffTime
ttlRemaining}
          -- Determine alive peers
          [Host]
alive <- IO [Host]
getAlive
          let othersAlive :: [Host]
othersAlive = [Host]
alive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host
advertise]
          [Host]
seenAlive <- STM IO [Host] -> IO [Host]
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO [Host] -> IO [Host]) -> STM IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO [Host]
forall a. TVar IO a -> a -> STM IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m a
swapTVar TVar [Host]
TVar IO [Host]
seenAliveVar [Host]
othersAlive
          [Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
othersAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
seenAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerConnected
          [Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
seenAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
othersAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerDisconnected
          -- Wait roughly ttl / 2
          DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
ttlRemaining DiffTime -> DiffTime -> DiffTime
forall a. Fractional a => a -> a -> a
/ DiffTime
2)
 where
  onGrpcException :: TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar GrpcException{GrpcError
grpcError :: GrpcException -> GrpcError
grpcError :: GrpcError
grpcError}
    | GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded = do
        Connectivity -> IO ()
onConnectivity Connectivity
NetworkDisconnected
        STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar [Host]
TVar IO [Host]
seenAliveVar []
        DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
  onGrpcException TVar [Host]
_ GrpcException
e = GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e

  createLease :: IO Int64
createLease = Text -> IO Int64 -> IO Int64
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"createLease" (IO Int64 -> IO Int64) -> IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ do
    Proto LeaseGrantResponse
leaseResponse <-
      Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf Lease "leaseGrant")
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseGrant")) (Input (Protobuf Lease "leaseGrant")
 -> IO (Output (Protobuf Lease "leaseGrant")))
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall a b. (a -> b) -> a -> b
$
        Proto LeaseGrantRequest
forall msg. Message msg => msg
defMessage Proto LeaseGrantRequest
-> (Proto LeaseGrantRequest -> Proto LeaseGrantRequest)
-> Proto LeaseGrantRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
#ttl ASetter
  (Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
-> Int64 -> Proto LeaseGrantRequest -> Proto LeaseGrantRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
3
    Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int64 -> IO Int64) -> Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Proto LeaseGrantResponse
leaseResponse Proto LeaseGrantResponse
-> Getting Int64 (Proto LeaseGrantResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseGrantResponse) Int64
#id

  withKeepAlive :: Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId IO DiffTime -> IO Any
action = do
    Connection
-> ClientHandler'
     'BiDiStreaming
     (ReaderT Connection IO)
     (Protobuf Lease "leaseKeepAlive")
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
    -> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
    -> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
    -> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseKeepAlive")) (((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
  -> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
  -> IO ())
 -> IO ())
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
    -> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
    -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
recv -> do
      IO Any -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Any -> IO ())
-> (IO DiffTime -> IO Any) -> IO DiffTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO DiffTime -> IO Any
action (IO DiffTime -> IO ()) -> IO DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send (NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
forall a b. (a -> b) -> a -> b
$ Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a. a -> NextElem a
NextElem (Input (Protobuf Lease "leaseKeepAlive")
 -> NextElem (Input (Protobuf Lease "leaseKeepAlive")))
-> Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveRequest
forall msg. Message msg => msg
defMessage Proto LeaseKeepAliveRequest
-> (Proto LeaseKeepAliveRequest -> Proto LeaseKeepAliveRequest)
-> Proto LeaseKeepAliveRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto LeaseKeepAliveRequest)
  (Proto LeaseKeepAliveRequest)
  Int64
  Int64
#id ASetter
  (Proto LeaseKeepAliveRequest)
  (Proto LeaseKeepAliveRequest)
  Int64
  Int64
-> Int64
-> Proto LeaseKeepAliveRequest
-> Proto LeaseKeepAliveRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId
        IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
IO (NextElem (Proto LeaseKeepAliveResponse))
recv IO (NextElem (Proto LeaseKeepAliveResponse))
-> (NextElem (Proto LeaseKeepAliveResponse) -> IO DiffTime)
-> IO DiffTime
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          NextElem Proto LeaseKeepAliveResponse
res -> DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime -> IO DiffTime)
-> (Int64 -> DiffTime) -> Int64 -> IO DiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> IO DiffTime) -> Int64 -> IO DiffTime
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveResponse
res Proto LeaseKeepAliveResponse
-> Getting Int64 (Proto LeaseKeepAliveResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseKeepAliveResponse) Int64
#ttl
          NextElem (Proto LeaseKeepAliveResponse)
NoNextElem -> do
            Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
NoKeepAliveResponse
            DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DiffTime
0

  writeAlive :: Int64 -> IO ()
writeAlive Int64
leaseId = Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"writeAlive" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> (Proto PutRequest -> IO (Proto PutResponse))
-> Proto PutRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) (Proto PutRequest -> IO ()) -> Proto PutRequest -> IO ()
forall a b. (a -> b) -> a -> b
$
      Proto PutRequest
forall msg. Message msg => msg
defMessage
        Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive-" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Host -> ByteString
forall b a. (Show a, IsString b) => a -> b
show Host
advertise
        Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Host -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' Host
advertise
        Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
#lease ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
-> Int64 -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId

  getAlive :: IO [Host]
getAlive = Text -> IO [Host] -> IO [Host]
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"getAlive" (IO [Host] -> IO [Host]) -> IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ do
    Proto RangeResponse
res <-
      Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "range")
-> Input (Protobuf KV "range")
-> IO (Output (Protobuf KV "range"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "range")) (Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range")))
-> Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range"))
forall a b. (a -> b) -> a -> b
$
        Proto RangeRequest
forall msg. Message msg => msg
defMessage
          Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive"
          Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#rangeEnd ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alivf" -- NOTE: e+1 to query prefixes
    ((Proto KeyValue -> IO (Maybe Host))
 -> [Proto KeyValue] -> IO [Host])
-> [Proto KeyValue]
-> (Proto KeyValue -> IO (Maybe Host))
-> IO [Host]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Proto KeyValue -> IO (Maybe Host))
-> [Proto KeyValue] -> IO [Host]
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> [a] -> m [b]
mapMaybeM (Proto RangeResponse
res Proto RangeResponse
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> Proto RangeResponse
    -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto KeyValue]
 -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
  -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
 -> Proto RangeResponse
 -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> [Proto KeyValue]
    -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO (Maybe Host)) -> IO [Host])
-> (Proto KeyValue -> IO (Maybe Host)) -> IO [Host]
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv -> do
      let value :: ByteString
value = Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
      case ByteString -> Either DecoderError Host
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
        Left DecoderError
err -> do
          Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
            Tracer IO EtcdLog
tracer
            FailedToDecodeValue
              { $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
              , $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
              , $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
              }
          Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Host
forall a. Maybe a
Nothing
        Right Host
x -> Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Host -> IO (Maybe Host)) -> Maybe Host -> IO (Maybe Host)
forall a b. (a -> b) -> a -> b
$ Host -> Maybe Host
forall a. a -> Maybe a
Just Host
x

-- | Add context to the 'grpcErrorMessage' of any 'GrpcException' raised.
withGrpcContext :: MonadCatch m => Text -> m a -> m a
withGrpcContext :: forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
context m a
action =
  m a
action m a -> (GrpcException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \e :: GrpcException
e@GrpcException{Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage} ->
    GrpcException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
      GrpcException
e
        { grpcErrorMessage =
            case grpcErrorMessage of
              Maybe Text
Nothing -> Text -> Maybe Text
forall a. a -> Maybe a
Just Text
context
              Just Text
msg -> Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Text
context Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
        }

-- | Like 'withProcessTerm', but sends first SIGINT and only SIGTERM if not
-- stopped within 5 seconds.
withProcessInterrupt ::
  (MonadIO m, MonadThrow m) =>
  ProcessConfig stdin stdout stderr ->
  (Process stdin stdout stderr -> m a) ->
  m a
withProcessInterrupt :: forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt ProcessConfig stdin stdout stderr
config =
  m (Process stdin stdout stderr)
-> (Process stdin stdout stderr -> m ())
-> (Process stdin stdout stderr -> m a)
-> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
startProcess (ProcessConfig stdin stdout stderr
 -> m (Process stdin stdout stderr))
-> ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall a b. (a -> b) -> a -> b
$ ProcessConfig stdin stdout stderr
config ProcessConfig stdin stdout stderr
-> (ProcessConfig stdin stdout stderr
    -> ProcessConfig stdin stdout stderr)
-> ProcessConfig stdin stdout stderr
forall a b. a -> (a -> b) -> b
& Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True) Process stdin stdout stderr -> m ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
signalAndStopProcess
 where
  signalAndStopProcess :: Process stdin stdout stderr -> m ()
signalAndStopProcess Process stdin stdout stderr
p = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    ProcessHandle -> IO ()
interruptProcessGroupOf (Process stdin stdout stderr -> ProcessHandle
forall stdin stdout stderr.
Process stdin stdout stderr -> ProcessHandle
unsafeProcessHandle Process stdin stdout stderr
p)
    IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_
      (IO ExitCode -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ExitCode -> IO ()) -> IO ExitCode -> IO ()
forall a b. (a -> b) -> a -> b
$ Process stdin stdout stderr -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process stdin stdout stderr
p)
      (DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
5 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process stdin stdout stderr -> IO ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
stopProcess Process stdin stdout stderr
p)

-- * Persistent queue

data PersistentQueue m a = PersistentQueue
  { forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
  , forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
  , forall (m :: * -> *) a. PersistentQueue m a -> FilePath
directory :: FilePath
  }

-- | Create a new persistent queue at file path and given capacity.
newPersistentQueue ::
  (MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
  FilePath ->
  Natural ->
  m (PersistentQueue m a)
newPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
FilePath -> Natural -> m (PersistentQueue m a)
newPersistentQueue FilePath
path Natural
capacity = do
  TBQueue m (Natural, a)
queue <- Natural -> m (TBQueue m (Natural, a))
forall a. Natural -> m (TBQueue m a)
forall (m :: * -> *) a. MonadSTM m => Natural -> m (TBQueue m a)
newTBQueueIO Natural
capacity
  Natural
highestId <-
    m Natural -> m (Either IOException Natural)
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue) m (Either IOException Natural)
-> (Either IOException Natural -> m Natural) -> m Natural
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left (IOException
_ :: IOException) -> do
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
path
        Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
      Right Natural
highest -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
highest
  TVar m Natural
nextIx <- Natural -> m (TVar m Natural)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Natural -> m (TVar m Natural)) -> Natural -> m (TVar m Natural)
forall a b. (a -> b) -> a -> b
$ Natural
highestId Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
  PersistentQueue m a -> m (PersistentQueue m a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: TVar m Natural
nextIx :: TVar m Natural
nextIx, $sel:directory:PersistentQueue :: FilePath
directory = FilePath
path}
 where
  loadExisting :: TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue = do
    [FilePath]
paths <- IO [FilePath] -> m [FilePath]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilePath] -> m [FilePath]) -> IO [FilePath] -> m [FilePath]
forall a b. (a -> b) -> a -> b
$ FilePath -> IO [FilePath]
listDirectory FilePath
path
    case [Natural] -> [Natural]
forall a. Ord a => [a] -> [a]
sort ([Natural] -> [Natural]) -> [Natural] -> [Natural]
forall a b. (a -> b) -> a -> b
$ (FilePath -> Maybe Natural) -> [FilePath] -> [Natural]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe FilePath -> Maybe Natural
forall a. Read a => FilePath -> Maybe a
readMaybe [FilePath]
paths of
      [] -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
      [Natural]
idxs -> do
        [Natural] -> (Natural -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Natural]
idxs ((Natural -> m ()) -> m ()) -> (Natural -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(Natural
idx :: Natural) -> do
          ByteString
bs <- FilePath -> m ByteString
forall (m :: * -> *). MonadIO m => FilePath -> m ByteString
readFileBS (FilePath
path FilePath -> FilePath -> FilePath
</> Natural -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Natural
idx)
          case ByteString -> Either DecoderError a
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
bs of
            Left DecoderError
err ->
              FilePath -> m ()
forall a. FilePath -> m a
forall (m :: * -> *) a. MonadFail m => FilePath -> m a
fail (FilePath -> m ()) -> FilePath -> m ()
forall a b. (a -> b) -> a -> b
$ FilePath
"Failed to decode item: " FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> DecoderError -> FilePath
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
            Right a
item ->
              STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
idx, a
item)
        Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> m Natural) -> Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ [Natural] -> Natural
forall a. HasCallStack => [a] -> a
List.last [Natural]
idxs

-- | Write a value to the queue, blocking if the queue is full.
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
writePersistentQueue :: forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
nextIx, FilePath
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> FilePath
directory :: FilePath
directory} a
item = do
  Natural
next <- STM m Natural -> m Natural
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Natural -> m Natural) -> STM m Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ do
    Natural
next <- TVar m Natural -> STM m Natural
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Natural
nextIx
    TVar m Natural -> (Natural -> Natural) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Natural
nextIx (Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
    Natural -> STM m Natural
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
next
  FilePath -> ByteString -> m ()
forall (m :: * -> *). MonadIO m => FilePath -> ByteString -> m ()
writeFileBS (FilePath
directory FilePath -> FilePath -> FilePath
</> Natural -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Natural
next) (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' a
item
  STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
next, a
item)

-- | Get the next value from the queue without removing it, blocking if the
-- queue is empty.
peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue :: forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue} = do
  (Natural, a) -> a
forall a b. (a, b) -> b
snd ((Natural, a) -> a) -> m (Natural, a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (Natural, a) -> m (Natural, a)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue)

-- | Remove an element from the queue if it matches the given item. Use
-- 'peekPersistentQueue' to wait for next items before popping it.
popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m ()
popPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, FilePath
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> FilePath
directory :: FilePath
directory} a
item = do
  Maybe Natural
popped <- STM m (Maybe Natural) -> m (Maybe Natural)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe Natural) -> m (Maybe Natural))
-> STM m (Maybe Natural) -> m (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ do
    (Natural
ix, a
next) <- TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue
    if a
next a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
item
      then TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue TBQueue m (Natural, a)
queue STM m (Natural, a) -> Maybe Natural -> STM m (Maybe Natural)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Natural -> Maybe Natural
forall a. a -> Maybe a
Just Natural
ix
      else Maybe Natural -> STM m (Maybe Natural)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Natural
forall a. Maybe a
Nothing
  case Maybe Natural
popped of
    Maybe Natural
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just Natural
index -> do
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (FilePath -> IO ()) -> FilePath -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> IO ()
removeFile (FilePath -> m ()) -> FilePath -> m ()
forall a b. (a -> b) -> a -> b
$ FilePath
directory FilePath -> FilePath -> FilePath
</> Natural -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Natural
index

-- * Tracing

data EtcdLog
  = EtcdLog {EtcdLog -> Value
etcd :: Value}
  | Reconnecting
  | BroadcastFailed {EtcdLog -> Text
reason :: Text}
  | FailedToDecodeLog {EtcdLog -> Text
log :: Text, reason :: Text}
  | FailedToDecodeValue {EtcdLog -> Text
key :: Text, EtcdLog -> Text
value :: Text, reason :: Text}
  | CreatedLease {EtcdLog -> Int64
leaseId :: Int64}
  | LowLeaseTTL {EtcdLog -> DiffTime
ttlRemaining :: DiffTime}
  | NoKeepAliveResponse
  | MatchingProtocolVersion {EtcdLog -> ProtocolVersion
version :: ProtocolVersion}
  deriving stock (EtcdLog -> EtcdLog -> Bool
(EtcdLog -> EtcdLog -> Bool)
-> (EtcdLog -> EtcdLog -> Bool) -> Eq EtcdLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EtcdLog -> EtcdLog -> Bool
== :: EtcdLog -> EtcdLog -> Bool
$c/= :: EtcdLog -> EtcdLog -> Bool
/= :: EtcdLog -> EtcdLog -> Bool
Eq, Int -> EtcdLog -> FilePath -> FilePath
[EtcdLog] -> FilePath -> FilePath
EtcdLog -> FilePath
(Int -> EtcdLog -> FilePath -> FilePath)
-> (EtcdLog -> FilePath)
-> ([EtcdLog] -> FilePath -> FilePath)
-> Show EtcdLog
forall a.
(Int -> a -> FilePath -> FilePath)
-> (a -> FilePath) -> ([a] -> FilePath -> FilePath) -> Show a
$cshowsPrec :: Int -> EtcdLog -> FilePath -> FilePath
showsPrec :: Int -> EtcdLog -> FilePath -> FilePath
$cshow :: EtcdLog -> FilePath
show :: EtcdLog -> FilePath
$cshowList :: [EtcdLog] -> FilePath -> FilePath
showList :: [EtcdLog] -> FilePath -> FilePath
Show, (forall x. EtcdLog -> Rep EtcdLog x)
-> (forall x. Rep EtcdLog x -> EtcdLog) -> Generic EtcdLog
forall x. Rep EtcdLog x -> EtcdLog
forall x. EtcdLog -> Rep EtcdLog x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. EtcdLog -> Rep EtcdLog x
from :: forall x. EtcdLog -> Rep EtcdLog x
$cto :: forall x. Rep EtcdLog x -> EtcdLog
to :: forall x. Rep EtcdLog x -> EtcdLog
Generic)
  deriving anyclass ([EtcdLog] -> Value
[EtcdLog] -> Encoding
EtcdLog -> Bool
EtcdLog -> Value
EtcdLog -> Encoding
(EtcdLog -> Value)
-> (EtcdLog -> Encoding)
-> ([EtcdLog] -> Value)
-> ([EtcdLog] -> Encoding)
-> (EtcdLog -> Bool)
-> ToJSON EtcdLog
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: EtcdLog -> Value
toJSON :: EtcdLog -> Value
$ctoEncoding :: EtcdLog -> Encoding
toEncoding :: EtcdLog -> Encoding
$ctoJSONList :: [EtcdLog] -> Value
toJSONList :: [EtcdLog] -> Value
$ctoEncodingList :: [EtcdLog] -> Encoding
toEncodingList :: [EtcdLog] -> Encoding
$comitField :: EtcdLog -> Bool
omitField :: EtcdLog -> Bool
ToJSON)