{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wno-deferred-out-of-scope-variables #-}

-- | Implements a Hydra network component using [etcd](https://etcd.io/).
--
-- While implementing a basic broadcast protocol over a distributed key-value
-- store is quite an overkill, the [Raft](https://raft.github.io/) consensus of
-- etcd provides our application with a crash-recovery fault-tolerant "atomic
-- broadcast" out-of-the-box. As a nice side-effect, the network layer becomes
-- very introspectable, while it would also support features like TLS or service
-- discovery.
--
-- The component starts and configures an `etcd` instance and connects to it
-- using a GRPC client. We can only write and read from the cluster while
-- connected to the majority cluster.
--
-- Broadcasting is implemented using @put@ to some well-known key, while message
-- delivery is done by using @watch@ on the same 'msg' prefix. We keep a last known
-- revision, also stored on disk, to start 'watch' with that revision (+1) and
-- only deliver messages that were not seen before. In case we are not connected
-- to our 'etcd' instance or not enough peers (= on a minority cluster), we
-- retry sending, but also store messages to broadcast in a 'PersistentQueue',
-- which makes the node resilient against crashes while sending. TODO: Is this
-- needed? performance limitation?
--
-- Connectivity and compatibility with other nodes on the cluster is tracked
-- using the key-value service as well:
--
--   * network connectivity is determined by being able to fetch the member list
--   * peer connectivity is tracked (best effort, not authorized) using an entry
--     at 'alive-\<advertise\>' keys with individual leases and repeated keep-alives
--   * each node compare-and-swaps its `version` into a key of the same name to
--     check compatibility (not updatable)
--
-- Note that the etcd cluster is configured to compact revisions down to 1000
-- every 5 minutes. This prevents infinite growth of the key-value store, but
-- also limits how long a node can be disconnected without missing out. 1000
-- should be more than enough for our use-case as the Hydra protocol will not
-- advance unless all participants are present.
module Hydra.Network.Etcd where

import Hydra.Prelude

import Cardano.Binary (decodeFull', serialize')
import Control.Concurrent.Class.MonadSTM (
  modifyTVar',
  newTBQueueIO,
  newTVarIO,
  peekTBQueue,
  readTBQueue,
  swapTVar,
  writeTBQueue,
  writeTVar,
 )
import Control.Exception (IOException)
import Control.Lens ((^.), (^..))
import Data.Aeson (decodeFileStrict', encodeFile)
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (Value)
import Data.ByteString qualified as BS
import Data.List ((\\))
import Data.List qualified as List
import Data.Map qualified as Map
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (
  Connectivity (..),
  Host (..),
  Network (..),
  NetworkCallback (..),
  NetworkComponent,
  NetworkConfiguration (..),
  ProtocolVersion,
 )
import Network.GRPC.Client (
  Address (..),
  ConnParams (..),
  Connection,
  ReconnectPolicy (..),
  ReconnectTo (ReconnectToOriginal),
  Server (..),
  rpc,
  withConnection,
 )
import Network.GRPC.Client.StreamType.IO (biDiStreaming, nonStreaming)
import Network.GRPC.Common (GrpcError (..), GrpcException (..), HTTP2Settings (..), NextElem (..), def, defaultHTTP2Settings)
import Network.GRPC.Common.NextElem (whileNext_)
import Network.GRPC.Common.Protobuf (Proto (..), Protobuf, defMessage, (.~))
import Network.GRPC.Etcd (
  Compare'CompareResult (..),
  Compare'CompareTarget (..),
  KV,
  Lease,
  Watch,
 )
import System.Directory (createDirectoryIfMissing, listDirectory, removeFile)
import System.Environment.Blank (getEnvironment)
import System.FilePath ((</>))
import System.IO.Error (isDoesNotExistError)
import System.Process (interruptProcessGroupOf)
import System.Process.Typed (
  Process,
  ProcessConfig,
  createPipe,
  getStderr,
  proc,
  setCreateGroup,
  setEnv,
  setStderr,
  startProcess,
  stopProcess,
  unsafeProcessHandle,
  waitExitCode,
 )
import UnliftIO (readTVarIO)

-- | Concrete network component that broadcasts messages to an etcd cluster and
-- listens for incoming messages.
withEtcdNetwork ::
  (ToCBOR msg, FromCBOR msg, Eq msg) =>
  Tracer IO EtcdLog ->
  ProtocolVersion ->
  -- TODO: check if all of these needed?
  NetworkConfiguration ->
  NetworkComponent IO msg msg ()
withEtcdNetwork :: forall msg.
(ToCBOR msg, FromCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> ProtocolVersion
-> NetworkConfiguration
-> NetworkComponent IO msg msg ()
withEtcdNetwork Tracer IO EtcdLog
tracer ProtocolVersion
protocolVersion NetworkConfiguration
config NetworkCallback msg IO
callback Network IO msg -> IO ()
action = do
  -- TODO: fail if cluster config / members do not match --peer
  -- configuration? That would be similar to the 'acks' persistence
  -- bailing out on loading.
  Map [Char] [Char]
envVars <- [([Char], [Char])] -> Map [Char] [Char]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([([Char], [Char])] -> Map [Char] [Char])
-> IO [([Char], [Char])] -> IO (Map [Char] [Char])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO [([Char], [Char])]
getEnvironment
  ProcessConfig () () Handle
-> (Process () () Handle -> IO ()) -> IO ()
forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt (Map [Char] [Char] -> ProcessConfig () () Handle
etcdCmd Map [Char] [Char]
envVars) ((Process () () Handle -> IO ()) -> IO ())
-> (Process () () Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Process () () Handle
p -> do
    IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process () () Handle
p IO ExitCode -> (ExitCode -> IO Any) -> IO Any
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ExitCode
ec -> [Char] -> IO Any
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO Any) -> [Char] -> IO Any
forall a b. (a -> b) -> a -> b
$ [Char]
"Sub-process etcd exited with: " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> ExitCode -> [Char]
forall b a. (Show a, IsString b) => a -> b
show ExitCode
ec) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> IO Any
traceStderr Process () () Handle
p) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- XXX: cleanup reconnecting through policy if other threads fail
        TVar Bool
doneVar <- Bool -> IO (TVar IO Bool)
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Bool
False
        -- NOTE: The connection to the server is set up asynchronously; the
        -- first rpc call will block until the connection has been established.
        ConnParams -> Server -> (Connection -> IO ()) -> IO ()
forall a. ConnParams -> Server -> (Connection -> IO a) -> IO a
withConnection (TVar Bool -> ConnParams
connParams TVar Bool
doneVar) Server
grpcServer ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
          -- REVIEW: checkVersion blocks if used on main thread - why?
          IO () -> (Async IO () -> IO ()) -> IO ()
forall a b. IO a -> (Async IO a -> IO b) -> IO b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
protocolVersion NetworkCallback msg IO
callback) ((Async IO () -> IO ()) -> IO ())
-> (Async IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async IO ()
_ -> do
            IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
              IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> [Char] -> NetworkCallback msg IO -> IO ()
forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> [Char] -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn [Char]
persistenceDir NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                PersistentQueue IO msg
queue <- [Char] -> Natural -> IO (PersistentQueue IO msg)
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
[Char] -> Natural -> m (PersistentQueue m a)
newPersistentQueue ([Char]
persistenceDir [Char] -> [Char] -> [Char]
</> [Char]
"pending-broadcast") Natural
100
                IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
advertise PersistentQueue IO msg
queue) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  Network IO msg -> IO ()
action
                    Network
                      { $sel:broadcast:Network :: msg -> IO ()
broadcast = PersistentQueue IO msg -> msg -> IO ()
forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue IO msg
queue
                      }
                  STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TVar IO Bool -> Bool -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar Bool
TVar IO Bool
doneVar Bool
True)
 where
  connParams :: TVar Bool -> ConnParams
connParams TVar Bool
doneVar =
    ConnParams
forall a. Default a => a
def
      { connReconnectPolicy = reconnectPolicy doneVar
      , -- NOTE: Not rate limit pings to our trusted, local etcd node. See
        -- comment on 'http2OverridePingRateLimit'.
        connHTTP2Settings = defaultHTTP2Settings{http2OverridePingRateLimit = Just maxBound}
      }

  reconnectPolicy :: TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar = ReconnectTo -> IO ReconnectPolicy -> ReconnectPolicy
ReconnectAfter ReconnectTo
ReconnectToOriginal (IO ReconnectPolicy -> ReconnectPolicy)
-> IO ReconnectPolicy -> ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ do
    Bool
done <- TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
doneVar
    if Bool
done
      then ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReconnectPolicy
DontReconnect
      else do
        DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
        Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
Reconnecting
        ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReconnectPolicy -> IO ReconnectPolicy)
-> ReconnectPolicy -> IO ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar

  clientHost :: Host
clientHost = Host{$sel:hostname:Host :: Text
hostname = Text
"127.0.0.1", $sel:port:Host :: PortNumber
port = PortNumber
clientPort}

  grpcServer :: Server
grpcServer =
    Address -> Server
ServerInsecure (Address -> Server) -> Address -> Server
forall a b. (a -> b) -> a -> b
$
      Address
        { addressHost :: [Char]
addressHost = Text -> [Char]
forall a. ToString a => a -> [Char]
toString (Text -> [Char]) -> Text -> [Char]
forall a b. (a -> b) -> a -> b
$ Host -> Text
hostname Host
clientHost
        , addressPort :: PortNumber
addressPort = Host -> PortNumber
port Host
clientHost
        , addressAuthority :: Maybe [Char]
addressAuthority = Maybe [Char]
forall a. Maybe a
Nothing
        }

  -- NOTE: Offset client port by the same amount as configured 'port' is offset
  -- from the default '5001'. This will result in the default client port 2379
  -- be used by default still.
  clientPort :: PortNumber
clientPort = PortNumber
2379 PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
+ Host -> PortNumber
port Host
listen PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
- PortNumber
5001

  traceStderr :: Process () () Handle -> IO Any
traceStderr Process () () Handle
p =
    IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
      ByteString
bs <- Handle -> IO ByteString
BS.hGetLine (Process () () Handle -> Handle
forall stdin stdout stderr. Process stdin stdout stderr -> stderr
getStderr Process () () Handle
p)
      case ByteString -> Either [Char] Value
forall a. FromJSON a => ByteString -> Either [Char] a
Aeson.eitherDecodeStrict ByteString
bs of
        Left [Char]
err -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer FailedToDecodeLog{$sel:log:EtcdLog :: Text
log = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 ByteString
bs, $sel:reason:EtcdLog :: Text
reason = [Char] -> Text
forall b a. (Show a, IsString b) => a -> b
show [Char]
err}
        Right Value
v -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ EtcdLog{etcd :: Value
etcd = Value
v}

  -- XXX: Could use TLS to secure peer connections
  -- XXX: Could use discovery to simplify configuration
  -- NOTE: Configured using guides: https://etcd.io/docs/v3.5/op-guide
  etcdCmd :: Map [Char] [Char] -> ProcessConfig () () Handle
etcdCmd Map [Char] [Char]
envVars =
    -- NOTE: We map prefers the left; so we need to mappend default at the end.
    [([Char], [Char])]
-> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
[([Char], [Char])]
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setEnv (Map [Char] [Char] -> [([Char], [Char])]
forall k a. Map k a -> [(k, a)]
Map.toList (Map [Char] [Char] -> [([Char], [Char])])
-> Map [Char] [Char] -> [([Char], [Char])]
forall a b. (a -> b) -> a -> b
$ Map [Char] [Char]
envVars Map [Char] [Char] -> Map [Char] [Char] -> Map [Char] [Char]
forall a. Semigroup a => a -> a -> a
<> Map [Char] [Char]
defaultEnv)
      (ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([[Char]] -> ProcessConfig () () Handle)
-> [[Char]]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True -- Prevents interrupt of main process when we send SIGINT to etcd
      (ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([[Char]] -> ProcessConfig () () Handle)
-> [[Char]]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamSpec 'STOutput Handle
-> ProcessConfig () () () -> ProcessConfig () () Handle
forall stderr stdin stdout stderr0.
StreamSpec 'STOutput stderr
-> ProcessConfig stdin stdout stderr0
-> ProcessConfig stdin stdout stderr
setStderr StreamSpec 'STOutput Handle
forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
      (ProcessConfig () () () -> ProcessConfig () () Handle)
-> ([[Char]] -> ProcessConfig () () ())
-> [[Char]]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> [[Char]] -> ProcessConfig () () ()
proc [Char]
"etcd"
      ([[Char]] -> ProcessConfig () () Handle)
-> [[Char]] -> ProcessConfig () () Handle
forall a b. (a -> b) -> a -> b
$ [[[Char]]] -> [[Char]]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
        [ -- NOTE: Must be used in clusterPeers
          [[Char]
"--name", Host -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Host
advertise]
        , [[Char]
"--data-dir", [Char]
persistenceDir [Char] -> [Char] -> [Char]
</> [Char]
"etcd"]
        , [[Char]
"--listen-peer-urls", Host -> [Char]
httpUrl Host
listen]
        , [[Char]
"--initial-advertise-peer-urls", Host -> [Char]
httpUrl Host
advertise]
        , [[Char]
"--listen-client-urls", Host -> [Char]
httpUrl Host
clientHost]
        , -- Pick a random port for http api (and use above only for grpc)
          [[Char]
"--listen-client-http-urls", [Char]
"http://localhost:0"]
        , -- Client access only on configured 'host' interface.
          [[Char]
"--advertise-client-urls", Host -> [Char]
httpUrl Host
clientHost]
        , -- XXX: Could use unique initial-cluster-tokens to isolate clusters
          [[Char]
"--initial-cluster-token", [Char]
"hydra-network-1"]
        , [[Char]
"--initial-cluster", [Char]
clusterPeers]
        ]

  defaultEnv :: Map.Map String String
  defaultEnv :: Map [Char] [Char]
defaultEnv =
    -- Keep up to 1000 revisions. See also:
    -- https://etcd.io/docs/v3.5/op-guide/maintenance/#auto-compaction
    [([Char], [Char])] -> Map [Char] [Char]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
      [ ([Char]
"ETCD_AUTO_COMPACTION_MODE", [Char]
"revision")
      , ([Char]
"ETCD_AUTO_COMPACTION_RETENTION", [Char]
"1000")
      ]

  -- NOTE: Building a canonical list of labels from the advertised addresses
  clusterPeers :: [Char]
clusterPeers =
    [Char] -> [[Char]] -> [Char]
forall a. [a] -> [[a]] -> [a]
intercalate [Char]
","
      ([[Char]] -> [Char]) -> ([Host] -> [[Char]]) -> [Host] -> [Char]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Host -> [Char]) -> [Host] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map (\Host
h -> Host -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Host
h [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> [Char]
"=" [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Host -> [Char]
httpUrl Host
h)
      ([Host] -> [Char]) -> [Host] -> [Char]
forall a b. (a -> b) -> a -> b
$ (Host
advertise Host -> [Host] -> [Host]
forall a. a -> [a] -> [a]
: [Host]
peers)

  httpUrl :: Host -> [Char]
httpUrl (Host Text
h PortNumber
p) = [Char]
"http://" [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
forall a. ToString a => a -> [Char]
toString Text
h [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> [Char]
":" [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> PortNumber -> [Char]
forall b a. (Show a, IsString b) => a -> b
show PortNumber
p

  NetworkConfiguration{[Char]
persistenceDir :: [Char]
$sel:persistenceDir:NetworkConfiguration :: NetworkConfiguration -> [Char]
persistenceDir, Host
listen :: Host
$sel:listen:NetworkConfiguration :: NetworkConfiguration -> Host
listen, Host
advertise :: Host
$sel:advertise:NetworkConfiguration :: NetworkConfiguration -> Host
advertise, [Host]
peers :: [Host]
$sel:peers:NetworkConfiguration :: NetworkConfiguration -> [Host]
peers} = NetworkConfiguration
config

-- | Check and write version on etcd cluster. This will retry until we are on a
-- majority cluster and succeed. If the version does not match a corresponding
-- 'Connectivity' message is sent via 'NetworkCallback'.
checkVersion ::
  Tracer IO EtcdLog ->
  Connection ->
  ProtocolVersion ->
  NetworkCallback msg IO ->
  IO ()
checkVersion :: forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
ourVersion NetworkCallback{Connectivity -> IO ()
onConnectivity :: Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity} = do
  -- Get or write our version into kv store
  Proto TxnResponse
res <-
    Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "txn")
-> Input (Protobuf KV "txn")
-> IO (Output (Protobuf KV "txn"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "txn")) (Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn")))
-> Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn"))
forall a b. (a -> b) -> a -> b
$
      Proto TxnRequest
forall msg. Message msg => msg
defMessage
        Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto Compare]
  [Proto Compare]
#compare ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto Compare]
  [Proto Compare]
-> [Proto Compare] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto Compare
versionExists]
        Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
#success ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
getVersion]
        Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
#failure ASetter
  (Proto TxnRequest)
  (Proto TxnRequest)
  [Proto RequestOp]
  [Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
putVersion]

  -- Check version if version was already present
  if Proto TxnResponse
res Proto TxnResponse -> Getting Bool (Proto TxnResponse) Bool -> Bool
forall s a. s -> Getting a s a -> a
^. Getting Bool (Proto TxnResponse) Bool
#succeeded
    then [Proto KeyValue] -> (Proto KeyValue -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto TxnResponse
res Proto TxnResponse
-> Getting
     (Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto ResponseOp]
 -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Proto TxnResponse
-> Const (Endo [Proto KeyValue]) (Proto TxnResponse)
#responses (([Proto ResponseOp]
  -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
 -> Proto TxnResponse
 -> Const (Endo [Proto KeyValue]) (Proto TxnResponse))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> [Proto ResponseOp]
    -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Getting
     (Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto ResponseOp
 -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ((Proto ResponseOp
  -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
 -> [Proto ResponseOp]
 -> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> Proto ResponseOp
    -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto RangeResponse
 -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
#responseRange ((Proto RangeResponse
  -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
 -> Proto ResponseOp
 -> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> Proto RangeResponse
    -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([Proto KeyValue]
 -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
  -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
 -> Proto RangeResponse
 -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> [Proto KeyValue]
    -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO ()) -> IO ())
-> (Proto KeyValue -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv ->
      case ByteString -> Either DecoderError ProtocolVersion
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' (ByteString -> Either DecoderError ProtocolVersion)
-> ByteString -> Either DecoderError ProtocolVersion
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value of
        Left DecoderError
err -> do
          Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$
            FailedToDecodeValue
              { $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
              , $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
              , $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
              }
          Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = Maybe ProtocolVersion
forall a. Maybe a
Nothing}
        Right ProtocolVersion
theirVersion ->
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ProtocolVersion
theirVersion ProtocolVersion -> ProtocolVersion -> Bool
forall a. Eq a => a -> a -> Bool
== ProtocolVersion
ourVersion) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = ProtocolVersion -> Maybe ProtocolVersion
forall a. a -> Maybe a
Just ProtocolVersion
theirVersion}
    else
      Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ MatchingProtocolVersion{version :: ProtocolVersion
version = ProtocolVersion
ourVersion}
 where
  versionKey :: ByteString
versionKey = ByteString
"version"

  -- exists = create_revision of key 'version' > 0
  versionExists :: Proto Compare
versionExists =
    Proto Compare
forall msg. Message msg => msg
defMessage
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareResult)
  (Proto Compare'CompareResult)
#result ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareResult)
  (Proto Compare'CompareResult)
-> Proto Compare'CompareResult -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareResult -> Proto Compare'CompareResult
forall msg. msg -> Proto msg
Proto Compare'CompareResult
Compare'GREATER
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareTarget)
  (Proto Compare'CompareTarget)
#target ASetter
  (Proto Compare)
  (Proto Compare)
  (Proto Compare'CompareTarget)
  (Proto Compare'CompareTarget)
-> Proto Compare'CompareTarget -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareTarget -> Proto Compare'CompareTarget
forall msg. msg -> Proto msg
Proto Compare'CompareTarget
Compare'VERSION
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) ByteString ByteString
#key ASetter (Proto Compare) (Proto Compare) ByteString ByteString
-> ByteString -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
      Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) Int64 Int64
#version ASetter (Proto Compare) (Proto Compare) Int64 Int64
-> Int64 -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
0

  getVersion :: Proto RequestOp
getVersion =
    Proto RequestOp
forall msg. Message msg => msg
defMessage Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto RangeRequest)
  (Proto RangeRequest)
#requestRange ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto RangeRequest)
  (Proto RangeRequest)
-> Proto RangeRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ (Proto RangeRequest
forall msg. Message msg => msg
defMessage Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey)

  putVersion :: Proto RequestOp
putVersion =
    Proto RequestOp
forall msg. Message msg => msg
defMessage
      Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto PutRequest)
  (Proto PutRequest)
#requestPut
        ASetter
  (Proto RequestOp)
  (Proto RequestOp)
  (Proto PutRequest)
  (Proto PutRequest)
-> Proto PutRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ( Proto PutRequest
forall msg. Message msg => msg
defMessage
               Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
               Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ProtocolVersion -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' ProtocolVersion
ourVersion
           )

-- | Broadcast messages from a queue to the etcd cluster.
--
-- TODO: retrying on failure even needed?
-- Retries on failure to 'putMessage' in case we are on a minority cluster.
broadcastMessages ::
  (ToCBOR msg, Eq msg) =>
  Tracer IO EtcdLog ->
  Connection ->
  -- | Used to identify sender.
  Host ->
  PersistentQueue IO msg ->
  IO ()
broadcastMessages :: forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
ourHost PersistentQueue IO msg
queue =
  Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"broadcastMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    msg
msg <- PersistentQueue IO msg -> IO msg
forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue IO msg
queue
    (Connection -> Host -> msg -> IO ()
forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PersistentQueue IO msg -> msg -> IO ()
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue IO msg
queue msg
msg)
      IO () -> (GrpcException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \case
        GrpcException{GrpcError
grpcError :: GrpcError
grpcError :: GrpcException -> GrpcError
grpcError, Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage}
          | GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded -> do
              Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ BroadcastFailed{$sel:reason:EtcdLog :: Text
reason = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
"unknown" Maybe Text
grpcErrorMessage}
              DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
        GrpcException
e -> GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e

-- | Broadcast a message to the etcd cluster.
putMessage ::
  ToCBOR msg =>
  Connection ->
  -- | Used to identify sender.
  Host ->
  msg ->
  IO ()
putMessage :: forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg =
  IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> IO (Proto PutResponse) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) Input (Protobuf KV "put")
Proto PutRequest
req
 where
  req :: Proto PutRequest
req =
    Proto PutRequest
forall msg. Message msg => msg
defMessage
      Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
key
      Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' msg
msg

  key :: ByteString
key = forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 @Text (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
"msg-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Host -> Text
forall b a. (Show a, IsString b) => a -> b
show Host
ourHost

-- | Fetch and wait for messages from the etcd cluster.
waitMessages ::
  FromCBOR msg =>
  Tracer IO EtcdLog ->
  Connection ->
  FilePath ->
  NetworkCallback msg IO ->
  IO ()
waitMessages :: forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> [Char] -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn [Char]
directory NetworkCallback{msg -> IO ()
deliver :: msg -> IO ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} = do
  Natural
revision <- [Char] -> IO Natural
forall (m :: * -> *). MonadIO m => [Char] -> m Natural
getLastKnownRevision [Char]
directory
  Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"waitMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- NOTE: We have not observed the watch (subscription) fail even when peers
    -- leave and we end up on a minority cluster.
    Connection
-> ClientHandler'
     'BiDiStreaming (ReaderT Connection IO) (Protobuf Watch "watch")
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
    -> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
    -> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Watch "watch")) (((NextElem (Input (Protobuf Watch "watch")) -> IO ())
  -> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
 -> IO ())
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
    -> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Watch "watch")) -> IO ()
send IO (NextElem (Output (Protobuf Watch "watch")))
recv -> do
      -- NOTE: Request all keys starting with 'msg'. See also section KeyRanges
      -- in https://etcd.io/docs/v3.5/learning/api/#key-value-api
      let watchRequest :: Proto WatchCreateRequest
watchRequest =
            Proto WatchCreateRequest
forall msg. Message msg => msg
defMessage
              Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
#key ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msg"
              Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
#rangeEnd ASetter
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
  ByteString
  ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msh" -- NOTE: g+1 to query prefixes
              Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
#startRevision ASetter
  (Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
-> Int64 -> Proto WatchCreateRequest -> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Natural -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural
revision Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
      NextElem (Input (Protobuf Watch "watch")) -> IO ()
NextElem (Proto WatchRequest) -> IO ()
send (NextElem (Proto WatchRequest) -> IO ())
-> (Proto WatchRequest -> NextElem (Proto WatchRequest))
-> Proto WatchRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proto WatchRequest -> NextElem (Proto WatchRequest)
forall a. a -> NextElem a
NextElem (Proto WatchRequest -> IO ()) -> Proto WatchRequest -> IO ()
forall a b. (a -> b) -> a -> b
$ Proto WatchRequest
forall msg. Message msg => msg
defMessage Proto WatchRequest
-> (Proto WatchRequest -> Proto WatchRequest) -> Proto WatchRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto WatchRequest)
  (Proto WatchRequest)
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
#createRequest ASetter
  (Proto WatchRequest)
  (Proto WatchRequest)
  (Proto WatchCreateRequest)
  (Proto WatchCreateRequest)
-> Proto WatchCreateRequest
-> Proto WatchRequest
-> Proto WatchRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Proto WatchCreateRequest
watchRequest
      IO (NextElem (Proto WatchResponse))
-> (Proto WatchResponse -> IO ()) -> IO ()
forall (m :: * -> *) a.
Monad m =>
m (NextElem a) -> (a -> m ()) -> m ()
whileNext_ IO (NextElem (Output (Protobuf Watch "watch")))
IO (NextElem (Proto WatchResponse))
recv Proto WatchResponse -> IO ()
process
    -- Wait before re-trying
    DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
 where
  process :: Proto WatchResponse -> IO ()
process Proto WatchResponse
res = do
    let revision :: Natural
revision = Int64 -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Natural) -> Int64 -> Natural
forall a b. (a -> b) -> a -> b
$ Proto WatchResponse
res Proto WatchResponse
-> Getting Int64 (Proto WatchResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. (Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Proto WatchResponse -> Const Int64 (Proto WatchResponse)
#header ((Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
 -> Proto WatchResponse -> Const Int64 (Proto WatchResponse))
-> ((Int64 -> Const Int64 Int64)
    -> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Getting Int64 (Proto WatchResponse) Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64 -> Const Int64 Int64)
-> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader)
#revision
    [Char] -> Natural -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> Natural -> m ()
putLastKnownRevision [Char]
directory Natural
revision
    [Proto Event] -> (Proto Event -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto WatchResponse
res Proto WatchResponse
-> Getting [Proto Event] (Proto WatchResponse) [Proto Event]
-> [Proto Event]
forall s a. s -> Getting a s a -> a
^. Getting [Proto Event] (Proto WatchResponse) [Proto Event]
#events) ((Proto Event -> IO ()) -> IO ())
-> (Proto Event -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto Event
event -> do
      let value :: ByteString
value = Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
 -> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#value
      case ByteString -> Either DecoderError msg
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
        Left DecoderError
err ->
          Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
            Tracer IO EtcdLog
tracer
            FailedToDecodeValue
              { $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
 -> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#key
              , $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
              , $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
              }
        Right msg
msg -> msg -> IO ()
deliver msg
msg

getLastKnownRevision :: MonadIO m => FilePath -> m Natural
getLastKnownRevision :: forall (m :: * -> *). MonadIO m => [Char] -> m Natural
getLastKnownRevision [Char]
directory = do
  IO Natural -> m Natural
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Natural -> m Natural) -> IO Natural -> m Natural
forall a b. (a -> b) -> a -> b
$
    IO (Maybe Natural) -> IO (Either IOException (Maybe Natural))
forall e a. Exception e => IO a -> IO (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try ([Char] -> IO (Maybe Natural)
forall a. FromJSON a => [Char] -> IO (Maybe a)
decodeFileStrict' ([Char] -> IO (Maybe Natural)) -> [Char] -> IO (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ [Char]
directory [Char] -> [Char] -> [Char]
</> [Char]
"last-known-revision") IO (Either IOException (Maybe Natural))
-> (Either IOException (Maybe Natural) -> IO Natural) -> IO Natural
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Right Maybe Natural
rev -> do
        Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> IO Natural) -> Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ Natural -> Maybe Natural -> Natural
forall a. a -> Maybe a -> a
fromMaybe Natural
1 Maybe Natural
rev
      Left (IOException
e :: IOException)
        | IOException -> Bool
isDoesNotExistError IOException
e -> Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
1
        | Bool
otherwise -> do
            [Char] -> IO Natural
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO Natural) -> [Char] -> IO Natural
forall a b. (a -> b) -> a -> b
$ [Char]
"Failed to load last known revision: " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> IOException -> [Char]
forall b a. (Show a, IsString b) => a -> b
show IOException
e

putLastKnownRevision :: MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision :: forall (m :: * -> *). MonadIO m => [Char] -> Natural -> m ()
putLastKnownRevision [Char]
directory Natural
rev = do
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Natural -> IO ()
forall a. ToJSON a => [Char] -> a -> IO ()
encodeFile ([Char]
directory [Char] -> [Char] -> [Char]
</> [Char]
"last-known-revision") Natural
rev

-- | Write a well-known key to indicate being alive, keep it alive using a lease
-- and poll other peers entries to yield connectivity events. While doing so,
-- overall network connectivity is determined from the ability to read/write to
-- the cluster.
pollConnectivity ::
  Tracer IO EtcdLog ->
  Connection ->
  -- | Local host
  Host ->
  NetworkCallback msg IO ->
  IO ()
pollConnectivity :: forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback{Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity :: Connectivity -> IO ()
onConnectivity} = do
  TVar [Host]
seenAliveVar <- [Host] -> IO (TVar IO [Host])
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO []
  Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"pollConnectivity" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (GrpcException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Int64
leaseId <- IO Int64
createLease
      -- If we can create a lease, we are connected
      Connectivity -> IO ()
onConnectivity Connectivity
NetworkConnected
      -- Write our alive key using lease
      Int64 -> IO ()
writeAlive Int64
leaseId
      Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer CreatedLease{Int64
leaseId :: Int64
$sel:leaseId:EtcdLog :: Int64
leaseId}
      Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId ((IO DiffTime -> IO Any) -> IO ())
-> (IO DiffTime -> IO Any) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO DiffTime
keepAlive ->
        IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
          -- Keep our lease alive
          DiffTime
ttlRemaining <- IO DiffTime
keepAlive
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DiffTime
ttlRemaining DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< DiffTime
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer LowLeaseTTL{DiffTime
ttlRemaining :: DiffTime
$sel:ttlRemaining:EtcdLog :: DiffTime
ttlRemaining}
          -- Determine alive peers
          [Host]
alive <- IO [Host]
getAlive
          let othersAlive :: [Host]
othersAlive = [Host]
alive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host
advertise]
          [Host]
seenAlive <- STM IO [Host] -> IO [Host]
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO [Host] -> IO [Host]) -> STM IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO [Host]
forall a. TVar IO a -> a -> STM IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m a
swapTVar TVar [Host]
TVar IO [Host]
seenAliveVar [Host]
othersAlive
          [Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
othersAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
seenAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerConnected
          [Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
seenAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
othersAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerDisconnected
          -- Wait roughly ttl / 2
          DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
ttlRemaining DiffTime -> DiffTime -> DiffTime
forall a. Fractional a => a -> a -> a
/ DiffTime
2)
 where
  onGrpcException :: TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar GrpcException{GrpcError
grpcError :: GrpcException -> GrpcError
grpcError :: GrpcError
grpcError}
    | GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded = do
        Connectivity -> IO ()
onConnectivity Connectivity
NetworkDisconnected
        STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar [Host]
TVar IO [Host]
seenAliveVar []
        DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
  onGrpcException TVar [Host]
_ GrpcException
e = GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e

  createLease :: IO Int64
createLease = Text -> IO Int64 -> IO Int64
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"createLease" (IO Int64 -> IO Int64) -> IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ do
    Proto LeaseGrantResponse
leaseResponse <-
      Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf Lease "leaseGrant")
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseGrant")) (Input (Protobuf Lease "leaseGrant")
 -> IO (Output (Protobuf Lease "leaseGrant")))
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall a b. (a -> b) -> a -> b
$
        Proto LeaseGrantRequest
forall msg. Message msg => msg
defMessage Proto LeaseGrantRequest
-> (Proto LeaseGrantRequest -> Proto LeaseGrantRequest)
-> Proto LeaseGrantRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
#ttl ASetter
  (Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
-> Int64 -> Proto LeaseGrantRequest -> Proto LeaseGrantRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
3
    Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int64 -> IO Int64) -> Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Proto LeaseGrantResponse
leaseResponse Proto LeaseGrantResponse
-> Getting Int64 (Proto LeaseGrantResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseGrantResponse) Int64
#id

  withKeepAlive :: Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId IO DiffTime -> IO Any
action = do
    Connection
-> ClientHandler'
     'BiDiStreaming
     (ReaderT Connection IO)
     (Protobuf Lease "leaseKeepAlive")
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
    -> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
    -> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
    -> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseKeepAlive")) (((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
  -> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
  -> IO ())
 -> IO ())
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
    -> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
    -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
recv -> do
      IO Any -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Any -> IO ())
-> (IO DiffTime -> IO Any) -> IO DiffTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO DiffTime -> IO Any
action (IO DiffTime -> IO ()) -> IO DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send (NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
forall a b. (a -> b) -> a -> b
$ Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a. a -> NextElem a
NextElem (Input (Protobuf Lease "leaseKeepAlive")
 -> NextElem (Input (Protobuf Lease "leaseKeepAlive")))
-> Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveRequest
forall msg. Message msg => msg
defMessage Proto LeaseKeepAliveRequest
-> (Proto LeaseKeepAliveRequest -> Proto LeaseKeepAliveRequest)
-> Proto LeaseKeepAliveRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto LeaseKeepAliveRequest)
  (Proto LeaseKeepAliveRequest)
  Int64
  Int64
#id ASetter
  (Proto LeaseKeepAliveRequest)
  (Proto LeaseKeepAliveRequest)
  Int64
  Int64
-> Int64
-> Proto LeaseKeepAliveRequest
-> Proto LeaseKeepAliveRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId
        IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
IO (NextElem (Proto LeaseKeepAliveResponse))
recv IO (NextElem (Proto LeaseKeepAliveResponse))
-> (NextElem (Proto LeaseKeepAliveResponse) -> IO DiffTime)
-> IO DiffTime
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          NextElem Proto LeaseKeepAliveResponse
res -> DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime -> IO DiffTime)
-> (Int64 -> DiffTime) -> Int64 -> IO DiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> IO DiffTime) -> Int64 -> IO DiffTime
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveResponse
res Proto LeaseKeepAliveResponse
-> Getting Int64 (Proto LeaseKeepAliveResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseKeepAliveResponse) Int64
#ttl
          NextElem (Proto LeaseKeepAliveResponse)
NoNextElem -> do
            Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
NoKeepAliveResponse
            DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DiffTime
0

  writeAlive :: Int64 -> IO ()
writeAlive Int64
leaseId = Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"writeAlive" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> (Proto PutRequest -> IO (Proto PutResponse))
-> Proto PutRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) (Proto PutRequest -> IO ()) -> Proto PutRequest -> IO ()
forall a b. (a -> b) -> a -> b
$
      Proto PutRequest
forall msg. Message msg => msg
defMessage
        Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive-" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Host -> ByteString
forall b a. (Show a, IsString b) => a -> b
show Host
advertise
        Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Host -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' Host
advertise
        Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
#lease ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
-> Int64 -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId

  getAlive :: IO [Host]
getAlive = Text -> IO [Host] -> IO [Host]
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"getAlive" (IO [Host] -> IO [Host]) -> IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ do
    Proto RangeResponse
res <-
      Connection
-> ClientHandler'
     'NonStreaming (ReaderT Connection IO) (Protobuf KV "range")
-> Input (Protobuf KV "range")
-> IO (Output (Protobuf KV "range"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
 SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "range")) (Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range")))
-> Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range"))
forall a b. (a -> b) -> a -> b
$
        Proto RangeRequest
forall msg. Message msg => msg
defMessage
          Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive"
          Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#rangeEnd ASetter
  (Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alivf" -- NOTE: e+1 to query prefixes
    ((Proto KeyValue -> IO (Maybe Host))
 -> [Proto KeyValue] -> IO [Host])
-> [Proto KeyValue]
-> (Proto KeyValue -> IO (Maybe Host))
-> IO [Host]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Proto KeyValue -> IO (Maybe Host))
-> [Proto KeyValue] -> IO [Host]
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> [a] -> m [b]
mapMaybeM (Proto RangeResponse
res Proto RangeResponse
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> Proto RangeResponse
    -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto KeyValue]
 -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
  -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
 -> Proto RangeResponse
 -> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
     -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
    -> [Proto KeyValue]
    -> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
    -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO (Maybe Host)) -> IO [Host])
-> (Proto KeyValue -> IO (Maybe Host)) -> IO [Host]
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv -> do
      let value :: ByteString
value = Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
      case ByteString -> Either DecoderError Host
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
        Left DecoderError
err -> do
          Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
            Tracer IO EtcdLog
tracer
            FailedToDecodeValue
              { $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
              , $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
              , $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
              }
          Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Host
forall a. Maybe a
Nothing
        Right Host
x -> Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Host -> IO (Maybe Host)) -> Maybe Host -> IO (Maybe Host)
forall a b. (a -> b) -> a -> b
$ Host -> Maybe Host
forall a. a -> Maybe a
Just Host
x

-- | Add context to the 'grpcErrorMessage' of any 'GrpcException' raised.
withGrpcContext :: MonadCatch m => Text -> m a -> m a
withGrpcContext :: forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
context m a
action =
  m a
action m a -> (GrpcException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \e :: GrpcException
e@GrpcException{Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage} ->
    GrpcException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
      GrpcException
e
        { grpcErrorMessage =
            case grpcErrorMessage of
              Maybe Text
Nothing -> Text -> Maybe Text
forall a. a -> Maybe a
Just Text
context
              Just Text
msg -> Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Text
context Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
        }

-- | Like 'withProcessTerm', but sends first SIGINT and only SIGTERM if not
-- stopped within 5 seconds.
withProcessInterrupt ::
  (MonadIO m, MonadThrow m) =>
  ProcessConfig stdin stdout stderr ->
  (Process stdin stdout stderr -> m a) ->
  m a
withProcessInterrupt :: forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt ProcessConfig stdin stdout stderr
config =
  m (Process stdin stdout stderr)
-> (Process stdin stdout stderr -> m ())
-> (Process stdin stdout stderr -> m a)
-> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
startProcess (ProcessConfig stdin stdout stderr
 -> m (Process stdin stdout stderr))
-> ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall a b. (a -> b) -> a -> b
$ ProcessConfig stdin stdout stderr
config ProcessConfig stdin stdout stderr
-> (ProcessConfig stdin stdout stderr
    -> ProcessConfig stdin stdout stderr)
-> ProcessConfig stdin stdout stderr
forall a b. a -> (a -> b) -> b
& Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True) Process stdin stdout stderr -> m ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
signalAndStopProcess
 where
  signalAndStopProcess :: Process stdin stdout stderr -> m ()
signalAndStopProcess Process stdin stdout stderr
p = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    ProcessHandle -> IO ()
interruptProcessGroupOf (Process stdin stdout stderr -> ProcessHandle
forall stdin stdout stderr.
Process stdin stdout stderr -> ProcessHandle
unsafeProcessHandle Process stdin stdout stderr
p)
    IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_
      (IO ExitCode -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ExitCode -> IO ()) -> IO ExitCode -> IO ()
forall a b. (a -> b) -> a -> b
$ Process stdin stdout stderr -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process stdin stdout stderr
p)
      (DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
5 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process stdin stdout stderr -> IO ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
stopProcess Process stdin stdout stderr
p)

-- * Persistent queue

data PersistentQueue m a = PersistentQueue
  { forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
  , forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
  , forall (m :: * -> *) a. PersistentQueue m a -> [Char]
directory :: FilePath
  }

-- | Create a new persistent queue at file path and given capacity.
newPersistentQueue ::
  (MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
  FilePath ->
  Natural ->
  m (PersistentQueue m a)
newPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
[Char] -> Natural -> m (PersistentQueue m a)
newPersistentQueue [Char]
path Natural
capacity = do
  TBQueue m (Natural, a)
queue <- Natural -> m (TBQueue m (Natural, a))
forall a. Natural -> m (TBQueue m a)
forall (m :: * -> *) a. MonadSTM m => Natural -> m (TBQueue m a)
newTBQueueIO Natural
capacity
  Natural
highestId <-
    m Natural -> m (Either IOException Natural)
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue) m (Either IOException Natural)
-> (Either IOException Natural -> m Natural) -> m Natural
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left (IOException
_ :: IOException) -> do
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> [Char] -> IO ()
createDirectoryIfMissing Bool
True [Char]
path
        Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
      Right Natural
highest -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
highest
  TVar m Natural
nextIx <- Natural -> m (TVar m Natural)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Natural -> m (TVar m Natural)) -> Natural -> m (TVar m Natural)
forall a b. (a -> b) -> a -> b
$ Natural
highestId Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
  PersistentQueue m a -> m (PersistentQueue m a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: TVar m Natural
nextIx :: TVar m Natural
nextIx, $sel:directory:PersistentQueue :: [Char]
directory = [Char]
path}
 where
  loadExisting :: TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue = do
    [[Char]]
paths <- IO [[Char]] -> m [[Char]]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [[Char]] -> m [[Char]]) -> IO [[Char]] -> m [[Char]]
forall a b. (a -> b) -> a -> b
$ [Char] -> IO [[Char]]
listDirectory [Char]
path
    case [Natural] -> [Natural]
forall a. Ord a => [a] -> [a]
sort ([Natural] -> [Natural]) -> [Natural] -> [Natural]
forall a b. (a -> b) -> a -> b
$ ([Char] -> Maybe Natural) -> [[Char]] -> [Natural]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe [Char] -> Maybe Natural
forall a. Read a => [Char] -> Maybe a
readMaybe [[Char]]
paths of
      [] -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
      [Natural]
idxs -> do
        [Natural] -> (Natural -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Natural]
idxs ((Natural -> m ()) -> m ()) -> (Natural -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(Natural
idx :: Natural) -> do
          ByteString
bs <- [Char] -> m ByteString
forall (m :: * -> *). MonadIO m => [Char] -> m ByteString
readFileBS ([Char]
path [Char] -> [Char] -> [Char]
</> Natural -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Natural
idx)
          case ByteString -> Either DecoderError a
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
bs of
            Left DecoderError
err ->
              [Char] -> m ()
forall a. [Char] -> m a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Failed to decode item: " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> DecoderError -> [Char]
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
            Right a
item ->
              STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
idx, a
item)
        Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> m Natural) -> Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ [Natural] -> Natural
forall a. HasCallStack => [a] -> a
List.last [Natural]
idxs

-- | Write a value to the queue, blocking if the queue is full.
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
writePersistentQueue :: forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
nextIx, [Char]
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> [Char]
directory :: [Char]
directory} a
item = do
  Natural
next <- STM m Natural -> m Natural
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Natural -> m Natural) -> STM m Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ do
    Natural
next <- TVar m Natural -> STM m Natural
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Natural
nextIx
    TVar m Natural -> (Natural -> Natural) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Natural
nextIx (Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
    Natural -> STM m Natural
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
next
  [Char] -> ByteString -> m ()
forall (m :: * -> *). MonadIO m => [Char] -> ByteString -> m ()
writeFileBS ([Char]
directory [Char] -> [Char] -> [Char]
</> Natural -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Natural
next) (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' a
item
  STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
next, a
item)

-- | Get the next value from the queue without removing it, blocking if the
-- queue is empty.
peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue :: forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue} = do
  (Natural, a) -> a
forall a b. (a, b) -> b
snd ((Natural, a) -> a) -> m (Natural, a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (Natural, a) -> m (Natural, a)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue)

-- | Remove an element from the queue if it matches the given item. Use
-- 'peekPersistentQueue' to wait for next items before popping it.
popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m ()
popPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, [Char]
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> [Char]
directory :: [Char]
directory} a
item = do
  Maybe Natural
popped <- STM m (Maybe Natural) -> m (Maybe Natural)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe Natural) -> m (Maybe Natural))
-> STM m (Maybe Natural) -> m (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ do
    (Natural
ix, a
next) <- TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue
    if a
next a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
item
      then TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue TBQueue m (Natural, a)
queue STM m (Natural, a) -> Maybe Natural -> STM m (Maybe Natural)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Natural -> Maybe Natural
forall a. a -> Maybe a
Just Natural
ix
      else Maybe Natural -> STM m (Maybe Natural)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Natural
forall a. Maybe a
Nothing
  case Maybe Natural
popped of
    Maybe Natural
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just Natural
index -> do
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> ([Char] -> IO ()) -> [Char] -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IO ()
removeFile ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
directory [Char] -> [Char] -> [Char]
</> Natural -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Natural
index

-- * Tracing

data EtcdLog
  = EtcdLog {EtcdLog -> Value
etcd :: Value}
  | Reconnecting
  | BroadcastFailed {EtcdLog -> Text
reason :: Text}
  | FailedToDecodeLog {EtcdLog -> Text
log :: Text, reason :: Text}
  | FailedToDecodeValue {EtcdLog -> Text
key :: Text, EtcdLog -> Text
value :: Text, reason :: Text}
  | CreatedLease {EtcdLog -> Int64
leaseId :: Int64}
  | LowLeaseTTL {EtcdLog -> DiffTime
ttlRemaining :: DiffTime}
  | NoKeepAliveResponse
  | MatchingProtocolVersion {EtcdLog -> ProtocolVersion
version :: ProtocolVersion}
  deriving stock (EtcdLog -> EtcdLog -> Bool
(EtcdLog -> EtcdLog -> Bool)
-> (EtcdLog -> EtcdLog -> Bool) -> Eq EtcdLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EtcdLog -> EtcdLog -> Bool
== :: EtcdLog -> EtcdLog -> Bool
$c/= :: EtcdLog -> EtcdLog -> Bool
/= :: EtcdLog -> EtcdLog -> Bool
Eq, Int -> EtcdLog -> [Char] -> [Char]
[EtcdLog] -> [Char] -> [Char]
EtcdLog -> [Char]
(Int -> EtcdLog -> [Char] -> [Char])
-> (EtcdLog -> [Char])
-> ([EtcdLog] -> [Char] -> [Char])
-> Show EtcdLog
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: Int -> EtcdLog -> [Char] -> [Char]
showsPrec :: Int -> EtcdLog -> [Char] -> [Char]
$cshow :: EtcdLog -> [Char]
show :: EtcdLog -> [Char]
$cshowList :: [EtcdLog] -> [Char] -> [Char]
showList :: [EtcdLog] -> [Char] -> [Char]
Show, (forall x. EtcdLog -> Rep EtcdLog x)
-> (forall x. Rep EtcdLog x -> EtcdLog) -> Generic EtcdLog
forall x. Rep EtcdLog x -> EtcdLog
forall x. EtcdLog -> Rep EtcdLog x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. EtcdLog -> Rep EtcdLog x
from :: forall x. EtcdLog -> Rep EtcdLog x
$cto :: forall x. Rep EtcdLog x -> EtcdLog
to :: forall x. Rep EtcdLog x -> EtcdLog
Generic)
  deriving anyclass ([EtcdLog] -> Value
[EtcdLog] -> Encoding
EtcdLog -> Bool
EtcdLog -> Value
EtcdLog -> Encoding
(EtcdLog -> Value)
-> (EtcdLog -> Encoding)
-> ([EtcdLog] -> Value)
-> ([EtcdLog] -> Encoding)
-> (EtcdLog -> Bool)
-> ToJSON EtcdLog
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: EtcdLog -> Value
toJSON :: EtcdLog -> Value
$ctoEncoding :: EtcdLog -> Encoding
toEncoding :: EtcdLog -> Encoding
$ctoJSONList :: [EtcdLog] -> Value
toJSONList :: [EtcdLog] -> Value
$ctoEncodingList :: [EtcdLog] -> Encoding
toEncodingList :: [EtcdLog] -> Encoding
$comitField :: EtcdLog -> Bool
omitField :: EtcdLog -> Bool
ToJSON)

instance Arbitrary EtcdLog where
  arbitrary :: Gen EtcdLog
arbitrary = Gen EtcdLog
forall a.
(Generic a, GA UnsizedOpts (Rep a),
 UniformWeight (Weights_ (Rep a))) =>
Gen a
genericArbitrary
  shrink :: EtcdLog -> [EtcdLog]
shrink = EtcdLog -> [EtcdLog]
forall a.
(Generic a, RecursivelyShrink (Rep a), GSubterms (Rep a) a) =>
a -> [a]
genericShrink