{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wno-deferred-out-of-scope-variables #-}
module Hydra.Network.Etcd where
import Hydra.Prelude
import Cardano.Binary (decodeFull', serialize')
import Control.Concurrent.Class.MonadSTM (
modifyTVar',
newTBQueueIO,
newTVarIO,
peekTBQueue,
readTBQueue,
swapTVar,
writeTBQueue,
writeTVar,
)
import Control.Exception (IOException)
import Control.Lens ((^.), (^..))
import Data.Aeson (decodeFileStrict', encodeFile)
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (Value)
import Data.ByteString qualified as BS
import Data.List ((\\))
import Data.List qualified as List
import Data.Map qualified as Map
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (
Connectivity (..),
Host (..),
Network (..),
NetworkCallback (..),
NetworkComponent,
NetworkConfiguration (..),
ProtocolVersion,
)
import Network.GRPC.Client (
Address (..),
ConnParams (..),
Connection,
ReconnectPolicy (..),
ReconnectTo (ReconnectToOriginal),
Server (..),
rpc,
withConnection,
)
import Network.GRPC.Client.StreamType.IO (biDiStreaming, nonStreaming)
import Network.GRPC.Common (GrpcError (..), GrpcException (..), HTTP2Settings (..), NextElem (..), def, defaultHTTP2Settings)
import Network.GRPC.Common.NextElem (whileNext_)
import Network.GRPC.Common.Protobuf (Proto (..), Protobuf, defMessage, (.~))
import Network.GRPC.Etcd (
Compare'CompareResult (..),
Compare'CompareTarget (..),
KV,
Lease,
Watch,
)
import System.Directory (createDirectoryIfMissing, listDirectory, removeFile)
import System.Environment.Blank (getEnvironment)
import System.FilePath ((</>))
import System.IO.Error (isDoesNotExistError)
import System.Process (interruptProcessGroupOf)
import System.Process.Typed (
Process,
ProcessConfig,
createPipe,
getStderr,
proc,
setCreateGroup,
setEnv,
setStderr,
startProcess,
stopProcess,
unsafeProcessHandle,
waitExitCode,
)
import UnliftIO (readTVarIO)
withEtcdNetwork ::
(ToCBOR msg, FromCBOR msg, Eq msg) =>
Tracer IO EtcdLog ->
ProtocolVersion ->
NetworkConfiguration ->
NetworkComponent IO msg msg ()
withEtcdNetwork :: forall msg.
(ToCBOR msg, FromCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> ProtocolVersion
-> NetworkConfiguration
-> NetworkComponent IO msg msg ()
withEtcdNetwork Tracer IO EtcdLog
tracer ProtocolVersion
protocolVersion NetworkConfiguration
config NetworkCallback msg IO
callback Network IO msg -> IO ()
action = do
Map [Char] [Char]
envVars <- [([Char], [Char])] -> Map [Char] [Char]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([([Char], [Char])] -> Map [Char] [Char])
-> IO [([Char], [Char])] -> IO (Map [Char] [Char])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO [([Char], [Char])]
getEnvironment
ProcessConfig () () Handle
-> (Process () () Handle -> IO ()) -> IO ()
forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt (Map [Char] [Char] -> ProcessConfig () () Handle
etcdCmd Map [Char] [Char]
envVars) ((Process () () Handle -> IO ()) -> IO ())
-> (Process () () Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Process () () Handle
p -> do
IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process () () Handle
p IO ExitCode -> (ExitCode -> IO Any) -> IO Any
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ExitCode
ec -> [Char] -> IO Any
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO Any) -> [Char] -> IO Any
forall a b. (a -> b) -> a -> b
$ [Char]
"Sub-process etcd exited with: " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> ExitCode -> [Char]
forall b a. (Show a, IsString b) => a -> b
show ExitCode
ec) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> IO Any
traceStderr Process () () Handle
p) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar Bool
doneVar <- Bool -> IO (TVar IO Bool)
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Bool
False
ConnParams -> Server -> (Connection -> IO ()) -> IO ()
forall a. ConnParams -> Server -> (Connection -> IO a) -> IO a
withConnection (TVar Bool -> ConnParams
connParams TVar Bool
doneVar) Server
grpcServer ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
IO () -> (Async IO () -> IO ()) -> IO ()
forall a b. IO a -> (Async IO a -> IO b) -> IO b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
protocolVersion NetworkCallback msg IO
callback) ((Async IO () -> IO ()) -> IO ())
-> (Async IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async IO ()
_ -> do
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> [Char] -> NetworkCallback msg IO -> IO ()
forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> [Char] -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn [Char]
persistenceDir NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PersistentQueue IO msg
queue <- [Char] -> Natural -> IO (PersistentQueue IO msg)
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
[Char] -> Natural -> m (PersistentQueue m a)
newPersistentQueue ([Char]
persistenceDir [Char] -> [Char] -> [Char]
</> [Char]
"pending-broadcast") Natural
100
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
advertise PersistentQueue IO msg
queue) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Network IO msg -> IO ()
action
Network
{ $sel:broadcast:Network :: msg -> IO ()
broadcast = PersistentQueue IO msg -> msg -> IO ()
forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue IO msg
queue
}
STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TVar IO Bool -> Bool -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar Bool
TVar IO Bool
doneVar Bool
True)
where
connParams :: TVar Bool -> ConnParams
connParams TVar Bool
doneVar =
ConnParams
forall a. Default a => a
def
{ connReconnectPolicy = reconnectPolicy doneVar
,
connHTTP2Settings = defaultHTTP2Settings{http2OverridePingRateLimit = Just maxBound}
}
reconnectPolicy :: TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar = ReconnectTo -> IO ReconnectPolicy -> ReconnectPolicy
ReconnectAfter ReconnectTo
ReconnectToOriginal (IO ReconnectPolicy -> ReconnectPolicy)
-> IO ReconnectPolicy -> ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ do
Bool
done <- TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
doneVar
if Bool
done
then ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReconnectPolicy
DontReconnect
else do
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
Reconnecting
ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReconnectPolicy -> IO ReconnectPolicy)
-> ReconnectPolicy -> IO ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar
clientHost :: Host
clientHost = Host{$sel:hostname:Host :: Text
hostname = Text
"127.0.0.1", $sel:port:Host :: PortNumber
port = PortNumber
clientPort}
grpcServer :: Server
grpcServer =
Address -> Server
ServerInsecure (Address -> Server) -> Address -> Server
forall a b. (a -> b) -> a -> b
$
Address
{ addressHost :: [Char]
addressHost = Text -> [Char]
forall a. ToString a => a -> [Char]
toString (Text -> [Char]) -> Text -> [Char]
forall a b. (a -> b) -> a -> b
$ Host -> Text
hostname Host
clientHost
, addressPort :: PortNumber
addressPort = Host -> PortNumber
port Host
clientHost
, addressAuthority :: Maybe [Char]
addressAuthority = Maybe [Char]
forall a. Maybe a
Nothing
}
clientPort :: PortNumber
clientPort = PortNumber
2379 PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
+ Host -> PortNumber
port Host
listen PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
- PortNumber
5001
traceStderr :: Process () () Handle -> IO Any
traceStderr Process () () Handle
p =
IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
ByteString
bs <- Handle -> IO ByteString
BS.hGetLine (Process () () Handle -> Handle
forall stdin stdout stderr. Process stdin stdout stderr -> stderr
getStderr Process () () Handle
p)
case ByteString -> Either [Char] Value
forall a. FromJSON a => ByteString -> Either [Char] a
Aeson.eitherDecodeStrict ByteString
bs of
Left [Char]
err -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer FailedToDecodeLog{$sel:log:EtcdLog :: Text
log = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 ByteString
bs, $sel:reason:EtcdLog :: Text
reason = [Char] -> Text
forall b a. (Show a, IsString b) => a -> b
show [Char]
err}
Right Value
v -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ EtcdLog{etcd :: Value
etcd = Value
v}
etcdCmd :: Map [Char] [Char] -> ProcessConfig () () Handle
etcdCmd Map [Char] [Char]
envVars =
[([Char], [Char])]
-> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
[([Char], [Char])]
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setEnv (Map [Char] [Char] -> [([Char], [Char])]
forall k a. Map k a -> [(k, a)]
Map.toList (Map [Char] [Char] -> [([Char], [Char])])
-> Map [Char] [Char] -> [([Char], [Char])]
forall a b. (a -> b) -> a -> b
$ Map [Char] [Char]
envVars Map [Char] [Char] -> Map [Char] [Char] -> Map [Char] [Char]
forall a. Semigroup a => a -> a -> a
<> Map [Char] [Char]
defaultEnv)
(ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([[Char]] -> ProcessConfig () () Handle)
-> [[Char]]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True
(ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([[Char]] -> ProcessConfig () () Handle)
-> [[Char]]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamSpec 'STOutput Handle
-> ProcessConfig () () () -> ProcessConfig () () Handle
forall stderr stdin stdout stderr0.
StreamSpec 'STOutput stderr
-> ProcessConfig stdin stdout stderr0
-> ProcessConfig stdin stdout stderr
setStderr StreamSpec 'STOutput Handle
forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
(ProcessConfig () () () -> ProcessConfig () () Handle)
-> ([[Char]] -> ProcessConfig () () ())
-> [[Char]]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> [[Char]] -> ProcessConfig () () ()
proc [Char]
"etcd"
([[Char]] -> ProcessConfig () () Handle)
-> [[Char]] -> ProcessConfig () () Handle
forall a b. (a -> b) -> a -> b
$ [[[Char]]] -> [[Char]]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[
[[Char]
"--name", Host -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Host
advertise]
, [[Char]
"--data-dir", [Char]
persistenceDir [Char] -> [Char] -> [Char]
</> [Char]
"etcd"]
, [[Char]
"--listen-peer-urls", Host -> [Char]
httpUrl Host
listen]
, [[Char]
"--initial-advertise-peer-urls", Host -> [Char]
httpUrl Host
advertise]
, [[Char]
"--listen-client-urls", Host -> [Char]
httpUrl Host
clientHost]
,
[[Char]
"--listen-client-http-urls", [Char]
"http://localhost:0"]
,
[[Char]
"--advertise-client-urls", Host -> [Char]
httpUrl Host
clientHost]
,
[[Char]
"--initial-cluster-token", [Char]
"hydra-network-1"]
, [[Char]
"--initial-cluster", [Char]
clusterPeers]
]
defaultEnv :: Map.Map String String
defaultEnv :: Map [Char] [Char]
defaultEnv =
[([Char], [Char])] -> Map [Char] [Char]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
[ ([Char]
"ETCD_AUTO_COMPACTION_MODE", [Char]
"revision")
, ([Char]
"ETCD_AUTO_COMPACTION_RETENTION", [Char]
"1000")
]
clusterPeers :: [Char]
clusterPeers =
[Char] -> [[Char]] -> [Char]
forall a. [a] -> [[a]] -> [a]
intercalate [Char]
","
([[Char]] -> [Char]) -> ([Host] -> [[Char]]) -> [Host] -> [Char]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Host -> [Char]) -> [Host] -> [[Char]]
forall a b. (a -> b) -> [a] -> [b]
map (\Host
h -> Host -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Host
h [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> [Char]
"=" [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Host -> [Char]
httpUrl Host
h)
([Host] -> [Char]) -> [Host] -> [Char]
forall a b. (a -> b) -> a -> b
$ (Host
advertise Host -> [Host] -> [Host]
forall a. a -> [a] -> [a]
: [Host]
peers)
httpUrl :: Host -> [Char]
httpUrl (Host Text
h PortNumber
p) = [Char]
"http://" [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> Text -> [Char]
forall a. ToString a => a -> [Char]
toString Text
h [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> [Char]
":" [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> PortNumber -> [Char]
forall b a. (Show a, IsString b) => a -> b
show PortNumber
p
NetworkConfiguration{[Char]
persistenceDir :: [Char]
$sel:persistenceDir:NetworkConfiguration :: NetworkConfiguration -> [Char]
persistenceDir, Host
listen :: Host
$sel:listen:NetworkConfiguration :: NetworkConfiguration -> Host
listen, Host
advertise :: Host
$sel:advertise:NetworkConfiguration :: NetworkConfiguration -> Host
advertise, [Host]
peers :: [Host]
$sel:peers:NetworkConfiguration :: NetworkConfiguration -> [Host]
peers} = NetworkConfiguration
config
checkVersion ::
Tracer IO EtcdLog ->
Connection ->
ProtocolVersion ->
NetworkCallback msg IO ->
IO ()
checkVersion :: forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
ourVersion NetworkCallback{Connectivity -> IO ()
onConnectivity :: Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity} = do
Proto TxnResponse
res <-
Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "txn")
-> Input (Protobuf KV "txn")
-> IO (Output (Protobuf KV "txn"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "txn")) (Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn")))
-> Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn"))
forall a b. (a -> b) -> a -> b
$
Proto TxnRequest
forall msg. Message msg => msg
defMessage
Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto Compare]
[Proto Compare]
#compare ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto Compare]
[Proto Compare]
-> [Proto Compare] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto Compare
versionExists]
Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
#success ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
getVersion]
Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
#failure ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
putVersion]
if Proto TxnResponse
res Proto TxnResponse -> Getting Bool (Proto TxnResponse) Bool -> Bool
forall s a. s -> Getting a s a -> a
^. Getting Bool (Proto TxnResponse) Bool
#succeeded
then [Proto KeyValue] -> (Proto KeyValue -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto TxnResponse
res Proto TxnResponse
-> Getting
(Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Proto TxnResponse
-> Const (Endo [Proto KeyValue]) (Proto TxnResponse)
#responses (([Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Proto TxnResponse
-> Const (Endo [Proto KeyValue]) (Proto TxnResponse))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Getting
(Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ((Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
#responseRange ((Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO ()) -> IO ())
-> (Proto KeyValue -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv ->
case ByteString -> Either DecoderError ProtocolVersion
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' (ByteString -> Either DecoderError ProtocolVersion)
-> ByteString -> Either DecoderError ProtocolVersion
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value of
Left DecoderError
err -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$
FailedToDecodeValue
{ $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
, $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
, $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
}
Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = Maybe ProtocolVersion
forall a. Maybe a
Nothing}
Right ProtocolVersion
theirVersion ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ProtocolVersion
theirVersion ProtocolVersion -> ProtocolVersion -> Bool
forall a. Eq a => a -> a -> Bool
== ProtocolVersion
ourVersion) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = ProtocolVersion -> Maybe ProtocolVersion
forall a. a -> Maybe a
Just ProtocolVersion
theirVersion}
else
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ MatchingProtocolVersion{version :: ProtocolVersion
version = ProtocolVersion
ourVersion}
where
versionKey :: ByteString
versionKey = ByteString
"version"
versionExists :: Proto Compare
versionExists =
Proto Compare
forall msg. Message msg => msg
defMessage
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareResult)
(Proto Compare'CompareResult)
#result ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareResult)
(Proto Compare'CompareResult)
-> Proto Compare'CompareResult -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareResult -> Proto Compare'CompareResult
forall msg. msg -> Proto msg
Proto Compare'CompareResult
Compare'GREATER
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareTarget)
(Proto Compare'CompareTarget)
#target ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareTarget)
(Proto Compare'CompareTarget)
-> Proto Compare'CompareTarget -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareTarget -> Proto Compare'CompareTarget
forall msg. msg -> Proto msg
Proto Compare'CompareTarget
Compare'VERSION
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) ByteString ByteString
#key ASetter (Proto Compare) (Proto Compare) ByteString ByteString
-> ByteString -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) Int64 Int64
#version ASetter (Proto Compare) (Proto Compare) Int64 Int64
-> Int64 -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
0
getVersion :: Proto RequestOp
getVersion =
Proto RequestOp
forall msg. Message msg => msg
defMessage Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto RangeRequest)
(Proto RangeRequest)
#requestRange ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto RangeRequest)
(Proto RangeRequest)
-> Proto RangeRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ (Proto RangeRequest
forall msg. Message msg => msg
defMessage Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey)
putVersion :: Proto RequestOp
putVersion =
Proto RequestOp
forall msg. Message msg => msg
defMessage
Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto PutRequest)
(Proto PutRequest)
#requestPut
ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto PutRequest)
(Proto PutRequest)
-> Proto PutRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ( Proto PutRequest
forall msg. Message msg => msg
defMessage
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ProtocolVersion -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' ProtocolVersion
ourVersion
)
broadcastMessages ::
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog ->
Connection ->
Host ->
PersistentQueue IO msg ->
IO ()
broadcastMessages :: forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
ourHost PersistentQueue IO msg
queue =
Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"broadcastMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
msg
msg <- PersistentQueue IO msg -> IO msg
forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue IO msg
queue
(Connection -> Host -> msg -> IO ()
forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PersistentQueue IO msg -> msg -> IO ()
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue IO msg
queue msg
msg)
IO () -> (GrpcException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \case
GrpcException{GrpcError
grpcError :: GrpcError
grpcError :: GrpcException -> GrpcError
grpcError, Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage}
| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ BroadcastFailed{$sel:reason:EtcdLog :: Text
reason = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
"unknown" Maybe Text
grpcErrorMessage}
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
GrpcException
e -> GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e
putMessage ::
ToCBOR msg =>
Connection ->
Host ->
msg ->
IO ()
putMessage :: forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg =
IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> IO (Proto PutResponse) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) Input (Protobuf KV "put")
Proto PutRequest
req
where
req :: Proto PutRequest
req =
Proto PutRequest
forall msg. Message msg => msg
defMessage
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
key
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' msg
msg
key :: ByteString
key = forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 @Text (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
"msg-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Host -> Text
forall b a. (Show a, IsString b) => a -> b
show Host
ourHost
waitMessages ::
FromCBOR msg =>
Tracer IO EtcdLog ->
Connection ->
FilePath ->
NetworkCallback msg IO ->
IO ()
waitMessages :: forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> [Char] -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn [Char]
directory NetworkCallback{msg -> IO ()
deliver :: msg -> IO ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} = do
Natural
revision <- [Char] -> IO Natural
forall (m :: * -> *). MonadIO m => [Char] -> m Natural
getLastKnownRevision [Char]
directory
Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"waitMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Connection
-> ClientHandler'
'BiDiStreaming (ReaderT Connection IO) (Protobuf Watch "watch")
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
-> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
-> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Watch "watch")) (((NextElem (Input (Protobuf Watch "watch")) -> IO ())
-> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ())
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
-> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Watch "watch")) -> IO ()
send IO (NextElem (Output (Protobuf Watch "watch")))
recv -> do
let watchRequest :: Proto WatchCreateRequest
watchRequest =
Proto WatchCreateRequest
forall msg. Message msg => msg
defMessage
Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
#key ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msg"
Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
#rangeEnd ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msh"
Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
#startRevision ASetter
(Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
-> Int64 -> Proto WatchCreateRequest -> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Natural -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural
revision Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
NextElem (Input (Protobuf Watch "watch")) -> IO ()
NextElem (Proto WatchRequest) -> IO ()
send (NextElem (Proto WatchRequest) -> IO ())
-> (Proto WatchRequest -> NextElem (Proto WatchRequest))
-> Proto WatchRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proto WatchRequest -> NextElem (Proto WatchRequest)
forall a. a -> NextElem a
NextElem (Proto WatchRequest -> IO ()) -> Proto WatchRequest -> IO ()
forall a b. (a -> b) -> a -> b
$ Proto WatchRequest
forall msg. Message msg => msg
defMessage Proto WatchRequest
-> (Proto WatchRequest -> Proto WatchRequest) -> Proto WatchRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchRequest)
(Proto WatchRequest)
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
#createRequest ASetter
(Proto WatchRequest)
(Proto WatchRequest)
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
-> Proto WatchCreateRequest
-> Proto WatchRequest
-> Proto WatchRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Proto WatchCreateRequest
watchRequest
IO (NextElem (Proto WatchResponse))
-> (Proto WatchResponse -> IO ()) -> IO ()
forall (m :: * -> *) a.
Monad m =>
m (NextElem a) -> (a -> m ()) -> m ()
whileNext_ IO (NextElem (Output (Protobuf Watch "watch")))
IO (NextElem (Proto WatchResponse))
recv Proto WatchResponse -> IO ()
process
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
where
process :: Proto WatchResponse -> IO ()
process Proto WatchResponse
res = do
let revision :: Natural
revision = Int64 -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Natural) -> Int64 -> Natural
forall a b. (a -> b) -> a -> b
$ Proto WatchResponse
res Proto WatchResponse
-> Getting Int64 (Proto WatchResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. (Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Proto WatchResponse -> Const Int64 (Proto WatchResponse)
#header ((Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Proto WatchResponse -> Const Int64 (Proto WatchResponse))
-> ((Int64 -> Const Int64 Int64)
-> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Getting Int64 (Proto WatchResponse) Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64 -> Const Int64 Int64)
-> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader)
#revision
[Char] -> Natural -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> Natural -> m ()
putLastKnownRevision [Char]
directory Natural
revision
[Proto Event] -> (Proto Event -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto WatchResponse
res Proto WatchResponse
-> Getting [Proto Event] (Proto WatchResponse) [Proto Event]
-> [Proto Event]
forall s a. s -> Getting a s a -> a
^. Getting [Proto Event] (Proto WatchResponse) [Proto Event]
#events) ((Proto Event -> IO ()) -> IO ())
-> (Proto Event -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto Event
event -> do
let value :: ByteString
value = Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#value
case ByteString -> Either DecoderError msg
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
Left DecoderError
err ->
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
Tracer IO EtcdLog
tracer
FailedToDecodeValue
{ $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#key
, $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
, $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
}
Right msg
msg -> msg -> IO ()
deliver msg
msg
getLastKnownRevision :: MonadIO m => FilePath -> m Natural
getLastKnownRevision :: forall (m :: * -> *). MonadIO m => [Char] -> m Natural
getLastKnownRevision [Char]
directory = do
IO Natural -> m Natural
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Natural -> m Natural) -> IO Natural -> m Natural
forall a b. (a -> b) -> a -> b
$
IO (Maybe Natural) -> IO (Either IOException (Maybe Natural))
forall e a. Exception e => IO a -> IO (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try ([Char] -> IO (Maybe Natural)
forall a. FromJSON a => [Char] -> IO (Maybe a)
decodeFileStrict' ([Char] -> IO (Maybe Natural)) -> [Char] -> IO (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ [Char]
directory [Char] -> [Char] -> [Char]
</> [Char]
"last-known-revision") IO (Either IOException (Maybe Natural))
-> (Either IOException (Maybe Natural) -> IO Natural) -> IO Natural
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right Maybe Natural
rev -> do
Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> IO Natural) -> Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ Natural -> Maybe Natural -> Natural
forall a. a -> Maybe a -> a
fromMaybe Natural
1 Maybe Natural
rev
Left (IOException
e :: IOException)
| IOException -> Bool
isDoesNotExistError IOException
e -> Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
1
| Bool
otherwise -> do
[Char] -> IO Natural
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO Natural) -> [Char] -> IO Natural
forall a b. (a -> b) -> a -> b
$ [Char]
"Failed to load last known revision: " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> IOException -> [Char]
forall b a. (Show a, IsString b) => a -> b
show IOException
e
putLastKnownRevision :: MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision :: forall (m :: * -> *). MonadIO m => [Char] -> Natural -> m ()
putLastKnownRevision [Char]
directory Natural
rev = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Natural -> IO ()
forall a. ToJSON a => [Char] -> a -> IO ()
encodeFile ([Char]
directory [Char] -> [Char] -> [Char]
</> [Char]
"last-known-revision") Natural
rev
pollConnectivity ::
Tracer IO EtcdLog ->
Connection ->
Host ->
NetworkCallback msg IO ->
IO ()
pollConnectivity :: forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback{Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity :: Connectivity -> IO ()
onConnectivity} = do
TVar [Host]
seenAliveVar <- [Host] -> IO (TVar IO [Host])
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO []
Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"pollConnectivity" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (GrpcException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int64
leaseId <- IO Int64
createLease
Connectivity -> IO ()
onConnectivity Connectivity
NetworkConnected
Int64 -> IO ()
writeAlive Int64
leaseId
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer CreatedLease{Int64
leaseId :: Int64
$sel:leaseId:EtcdLog :: Int64
leaseId}
Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId ((IO DiffTime -> IO Any) -> IO ())
-> (IO DiffTime -> IO Any) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO DiffTime
keepAlive ->
IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
DiffTime
ttlRemaining <- IO DiffTime
keepAlive
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DiffTime
ttlRemaining DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< DiffTime
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer LowLeaseTTL{DiffTime
ttlRemaining :: DiffTime
$sel:ttlRemaining:EtcdLog :: DiffTime
ttlRemaining}
[Host]
alive <- IO [Host]
getAlive
let othersAlive :: [Host]
othersAlive = [Host]
alive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host
advertise]
[Host]
seenAlive <- STM IO [Host] -> IO [Host]
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO [Host] -> IO [Host]) -> STM IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO [Host]
forall a. TVar IO a -> a -> STM IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m a
swapTVar TVar [Host]
TVar IO [Host]
seenAliveVar [Host]
othersAlive
[Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
othersAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
seenAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerConnected
[Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
seenAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
othersAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerDisconnected
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
ttlRemaining DiffTime -> DiffTime -> DiffTime
forall a. Fractional a => a -> a -> a
/ DiffTime
2)
where
onGrpcException :: TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar GrpcException{GrpcError
grpcError :: GrpcException -> GrpcError
grpcError :: GrpcError
grpcError}
| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded = do
Connectivity -> IO ()
onConnectivity Connectivity
NetworkDisconnected
STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar [Host]
TVar IO [Host]
seenAliveVar []
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
onGrpcException TVar [Host]
_ GrpcException
e = GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e
createLease :: IO Int64
createLease = Text -> IO Int64 -> IO Int64
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"createLease" (IO Int64 -> IO Int64) -> IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ do
Proto LeaseGrantResponse
leaseResponse <-
Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf Lease "leaseGrant")
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseGrant")) (Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant")))
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall a b. (a -> b) -> a -> b
$
Proto LeaseGrantRequest
forall msg. Message msg => msg
defMessage Proto LeaseGrantRequest
-> (Proto LeaseGrantRequest -> Proto LeaseGrantRequest)
-> Proto LeaseGrantRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
#ttl ASetter
(Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
-> Int64 -> Proto LeaseGrantRequest -> Proto LeaseGrantRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
3
Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int64 -> IO Int64) -> Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Proto LeaseGrantResponse
leaseResponse Proto LeaseGrantResponse
-> Getting Int64 (Proto LeaseGrantResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseGrantResponse) Int64
#id
withKeepAlive :: Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId IO DiffTime -> IO Any
action = do
Connection
-> ClientHandler'
'BiDiStreaming
(ReaderT Connection IO)
(Protobuf Lease "leaseKeepAlive")
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
-> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
-> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseKeepAlive")) (((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
-> IO ())
-> IO ())
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
-> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
recv -> do
IO Any -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Any -> IO ())
-> (IO DiffTime -> IO Any) -> IO DiffTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO DiffTime -> IO Any
action (IO DiffTime -> IO ()) -> IO DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ do
NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send (NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
forall a b. (a -> b) -> a -> b
$ Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a. a -> NextElem a
NextElem (Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive")))
-> Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveRequest
forall msg. Message msg => msg
defMessage Proto LeaseKeepAliveRequest
-> (Proto LeaseKeepAliveRequest -> Proto LeaseKeepAliveRequest)
-> Proto LeaseKeepAliveRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto LeaseKeepAliveRequest)
(Proto LeaseKeepAliveRequest)
Int64
Int64
#id ASetter
(Proto LeaseKeepAliveRequest)
(Proto LeaseKeepAliveRequest)
Int64
Int64
-> Int64
-> Proto LeaseKeepAliveRequest
-> Proto LeaseKeepAliveRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId
IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
IO (NextElem (Proto LeaseKeepAliveResponse))
recv IO (NextElem (Proto LeaseKeepAliveResponse))
-> (NextElem (Proto LeaseKeepAliveResponse) -> IO DiffTime)
-> IO DiffTime
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
NextElem Proto LeaseKeepAliveResponse
res -> DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime -> IO DiffTime)
-> (Int64 -> DiffTime) -> Int64 -> IO DiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> IO DiffTime) -> Int64 -> IO DiffTime
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveResponse
res Proto LeaseKeepAliveResponse
-> Getting Int64 (Proto LeaseKeepAliveResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseKeepAliveResponse) Int64
#ttl
NextElem (Proto LeaseKeepAliveResponse)
NoNextElem -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
NoKeepAliveResponse
DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DiffTime
0
writeAlive :: Int64 -> IO ()
writeAlive Int64
leaseId = Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"writeAlive" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> (Proto PutRequest -> IO (Proto PutResponse))
-> Proto PutRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) (Proto PutRequest -> IO ()) -> Proto PutRequest -> IO ()
forall a b. (a -> b) -> a -> b
$
Proto PutRequest
forall msg. Message msg => msg
defMessage
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive-" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Host -> ByteString
forall b a. (Show a, IsString b) => a -> b
show Host
advertise
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Host -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' Host
advertise
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
#lease ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
-> Int64 -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId
getAlive :: IO [Host]
getAlive = Text -> IO [Host] -> IO [Host]
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"getAlive" (IO [Host] -> IO [Host]) -> IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ do
Proto RangeResponse
res <-
Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "range")
-> Input (Protobuf KV "range")
-> IO (Output (Protobuf KV "range"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "range")) (Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range")))
-> Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range"))
forall a b. (a -> b) -> a -> b
$
Proto RangeRequest
forall msg. Message msg => msg
defMessage
Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive"
Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#rangeEnd ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alivf"
((Proto KeyValue -> IO (Maybe Host))
-> [Proto KeyValue] -> IO [Host])
-> [Proto KeyValue]
-> (Proto KeyValue -> IO (Maybe Host))
-> IO [Host]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Proto KeyValue -> IO (Maybe Host))
-> [Proto KeyValue] -> IO [Host]
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> [a] -> m [b]
mapMaybeM (Proto RangeResponse
res Proto RangeResponse
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO (Maybe Host)) -> IO [Host])
-> (Proto KeyValue -> IO (Maybe Host)) -> IO [Host]
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv -> do
let value :: ByteString
value = Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
case ByteString -> Either DecoderError Host
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
Left DecoderError
err -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
Tracer IO EtcdLog
tracer
FailedToDecodeValue
{ $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
, $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
, $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
}
Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Host
forall a. Maybe a
Nothing
Right Host
x -> Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Host -> IO (Maybe Host)) -> Maybe Host -> IO (Maybe Host)
forall a b. (a -> b) -> a -> b
$ Host -> Maybe Host
forall a. a -> Maybe a
Just Host
x
withGrpcContext :: MonadCatch m => Text -> m a -> m a
withGrpcContext :: forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
context m a
action =
m a
action m a -> (GrpcException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \e :: GrpcException
e@GrpcException{Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage} ->
GrpcException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
GrpcException
e
{ grpcErrorMessage =
case grpcErrorMessage of
Maybe Text
Nothing -> Text -> Maybe Text
forall a. a -> Maybe a
Just Text
context
Just Text
msg -> Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Text
context Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
}
withProcessInterrupt ::
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr ->
(Process stdin stdout stderr -> m a) ->
m a
withProcessInterrupt :: forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt ProcessConfig stdin stdout stderr
config =
m (Process stdin stdout stderr)
-> (Process stdin stdout stderr -> m ())
-> (Process stdin stdout stderr -> m a)
-> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
startProcess (ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr))
-> ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall a b. (a -> b) -> a -> b
$ ProcessConfig stdin stdout stderr
config ProcessConfig stdin stdout stderr
-> (ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr)
-> ProcessConfig stdin stdout stderr
forall a b. a -> (a -> b) -> b
& Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True) Process stdin stdout stderr -> m ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
signalAndStopProcess
where
signalAndStopProcess :: Process stdin stdout stderr -> m ()
signalAndStopProcess Process stdin stdout stderr
p = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ProcessHandle -> IO ()
interruptProcessGroupOf (Process stdin stdout stderr -> ProcessHandle
forall stdin stdout stderr.
Process stdin stdout stderr -> ProcessHandle
unsafeProcessHandle Process stdin stdout stderr
p)
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_
(IO ExitCode -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ExitCode -> IO ()) -> IO ExitCode -> IO ()
forall a b. (a -> b) -> a -> b
$ Process stdin stdout stderr -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process stdin stdout stderr
p)
(DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
5 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process stdin stdout stderr -> IO ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
stopProcess Process stdin stdout stderr
p)
data PersistentQueue m a = PersistentQueue
{ forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
, forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
, forall (m :: * -> *) a. PersistentQueue m a -> [Char]
directory :: FilePath
}
newPersistentQueue ::
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
FilePath ->
Natural ->
m (PersistentQueue m a)
newPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
[Char] -> Natural -> m (PersistentQueue m a)
newPersistentQueue [Char]
path Natural
capacity = do
TBQueue m (Natural, a)
queue <- Natural -> m (TBQueue m (Natural, a))
forall a. Natural -> m (TBQueue m a)
forall (m :: * -> *) a. MonadSTM m => Natural -> m (TBQueue m a)
newTBQueueIO Natural
capacity
Natural
highestId <-
m Natural -> m (Either IOException Natural)
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue) m (Either IOException Natural)
-> (Either IOException Natural -> m Natural) -> m Natural
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left (IOException
_ :: IOException) -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> [Char] -> IO ()
createDirectoryIfMissing Bool
True [Char]
path
Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
Right Natural
highest -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
highest
TVar m Natural
nextIx <- Natural -> m (TVar m Natural)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Natural -> m (TVar m Natural)) -> Natural -> m (TVar m Natural)
forall a b. (a -> b) -> a -> b
$ Natural
highestId Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
PersistentQueue m a -> m (PersistentQueue m a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: TVar m Natural
nextIx :: TVar m Natural
nextIx, $sel:directory:PersistentQueue :: [Char]
directory = [Char]
path}
where
loadExisting :: TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue = do
[[Char]]
paths <- IO [[Char]] -> m [[Char]]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [[Char]] -> m [[Char]]) -> IO [[Char]] -> m [[Char]]
forall a b. (a -> b) -> a -> b
$ [Char] -> IO [[Char]]
listDirectory [Char]
path
case [Natural] -> [Natural]
forall a. Ord a => [a] -> [a]
sort ([Natural] -> [Natural]) -> [Natural] -> [Natural]
forall a b. (a -> b) -> a -> b
$ ([Char] -> Maybe Natural) -> [[Char]] -> [Natural]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe [Char] -> Maybe Natural
forall a. Read a => [Char] -> Maybe a
readMaybe [[Char]]
paths of
[] -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
[Natural]
idxs -> do
[Natural] -> (Natural -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Natural]
idxs ((Natural -> m ()) -> m ()) -> (Natural -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(Natural
idx :: Natural) -> do
ByteString
bs <- [Char] -> m ByteString
forall (m :: * -> *). MonadIO m => [Char] -> m ByteString
readFileBS ([Char]
path [Char] -> [Char] -> [Char]
</> Natural -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Natural
idx)
case ByteString -> Either DecoderError a
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
bs of
Left DecoderError
err ->
[Char] -> m ()
forall a. [Char] -> m a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Failed to decode item: " [Char] -> [Char] -> [Char]
forall a. Semigroup a => a -> a -> a
<> DecoderError -> [Char]
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
Right a
item ->
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
idx, a
item)
Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> m Natural) -> Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ [Natural] -> Natural
forall a. HasCallStack => [a] -> a
List.last [Natural]
idxs
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
writePersistentQueue :: forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
nextIx, [Char]
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> [Char]
directory :: [Char]
directory} a
item = do
Natural
next <- STM m Natural -> m Natural
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Natural -> m Natural) -> STM m Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ do
Natural
next <- TVar m Natural -> STM m Natural
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Natural
nextIx
TVar m Natural -> (Natural -> Natural) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Natural
nextIx (Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
Natural -> STM m Natural
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
next
[Char] -> ByteString -> m ()
forall (m :: * -> *). MonadIO m => [Char] -> ByteString -> m ()
writeFileBS ([Char]
directory [Char] -> [Char] -> [Char]
</> Natural -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Natural
next) (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' a
item
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
next, a
item)
peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue :: forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue} = do
(Natural, a) -> a
forall a b. (a, b) -> b
snd ((Natural, a) -> a) -> m (Natural, a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (Natural, a) -> m (Natural, a)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue)
popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m ()
popPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, [Char]
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> [Char]
directory :: [Char]
directory} a
item = do
Maybe Natural
popped <- STM m (Maybe Natural) -> m (Maybe Natural)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe Natural) -> m (Maybe Natural))
-> STM m (Maybe Natural) -> m (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ do
(Natural
ix, a
next) <- TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue
if a
next a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
item
then TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue TBQueue m (Natural, a)
queue STM m (Natural, a) -> Maybe Natural -> STM m (Maybe Natural)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Natural -> Maybe Natural
forall a. a -> Maybe a
Just Natural
ix
else Maybe Natural -> STM m (Maybe Natural)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Natural
forall a. Maybe a
Nothing
case Maybe Natural
popped of
Maybe Natural
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Natural
index -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> ([Char] -> IO ()) -> [Char] -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IO ()
removeFile ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
directory [Char] -> [Char] -> [Char]
</> Natural -> [Char]
forall b a. (Show a, IsString b) => a -> b
show Natural
index
data EtcdLog
= EtcdLog {EtcdLog -> Value
etcd :: Value}
| Reconnecting
| BroadcastFailed {EtcdLog -> Text
reason :: Text}
| FailedToDecodeLog {EtcdLog -> Text
log :: Text, reason :: Text}
| FailedToDecodeValue {EtcdLog -> Text
key :: Text, EtcdLog -> Text
value :: Text, reason :: Text}
| CreatedLease {EtcdLog -> Int64
leaseId :: Int64}
| LowLeaseTTL {EtcdLog -> DiffTime
ttlRemaining :: DiffTime}
| NoKeepAliveResponse
| MatchingProtocolVersion {EtcdLog -> ProtocolVersion
version :: ProtocolVersion}
deriving stock (EtcdLog -> EtcdLog -> Bool
(EtcdLog -> EtcdLog -> Bool)
-> (EtcdLog -> EtcdLog -> Bool) -> Eq EtcdLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EtcdLog -> EtcdLog -> Bool
== :: EtcdLog -> EtcdLog -> Bool
$c/= :: EtcdLog -> EtcdLog -> Bool
/= :: EtcdLog -> EtcdLog -> Bool
Eq, Int -> EtcdLog -> [Char] -> [Char]
[EtcdLog] -> [Char] -> [Char]
EtcdLog -> [Char]
(Int -> EtcdLog -> [Char] -> [Char])
-> (EtcdLog -> [Char])
-> ([EtcdLog] -> [Char] -> [Char])
-> Show EtcdLog
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: Int -> EtcdLog -> [Char] -> [Char]
showsPrec :: Int -> EtcdLog -> [Char] -> [Char]
$cshow :: EtcdLog -> [Char]
show :: EtcdLog -> [Char]
$cshowList :: [EtcdLog] -> [Char] -> [Char]
showList :: [EtcdLog] -> [Char] -> [Char]
Show, (forall x. EtcdLog -> Rep EtcdLog x)
-> (forall x. Rep EtcdLog x -> EtcdLog) -> Generic EtcdLog
forall x. Rep EtcdLog x -> EtcdLog
forall x. EtcdLog -> Rep EtcdLog x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. EtcdLog -> Rep EtcdLog x
from :: forall x. EtcdLog -> Rep EtcdLog x
$cto :: forall x. Rep EtcdLog x -> EtcdLog
to :: forall x. Rep EtcdLog x -> EtcdLog
Generic)
deriving anyclass ([EtcdLog] -> Value
[EtcdLog] -> Encoding
EtcdLog -> Bool
EtcdLog -> Value
EtcdLog -> Encoding
(EtcdLog -> Value)
-> (EtcdLog -> Encoding)
-> ([EtcdLog] -> Value)
-> ([EtcdLog] -> Encoding)
-> (EtcdLog -> Bool)
-> ToJSON EtcdLog
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: EtcdLog -> Value
toJSON :: EtcdLog -> Value
$ctoEncoding :: EtcdLog -> Encoding
toEncoding :: EtcdLog -> Encoding
$ctoJSONList :: [EtcdLog] -> Value
toJSONList :: [EtcdLog] -> Value
$ctoEncodingList :: [EtcdLog] -> Encoding
toEncodingList :: [EtcdLog] -> Encoding
$comitField :: EtcdLog -> Bool
omitField :: EtcdLog -> Bool
ToJSON)
instance Arbitrary EtcdLog where
arbitrary :: Gen EtcdLog
arbitrary = Gen EtcdLog
forall a.
(Generic a, GA UnsizedOpts (Rep a),
UniformWeight (Weights_ (Rep a))) =>
Gen a
genericArbitrary
shrink :: EtcdLog -> [EtcdLog]
shrink = EtcdLog -> [EtcdLog]
forall a.
(Generic a, RecursivelyShrink (Rep a), GSubterms (Rep a) a) =>
a -> [a]
genericShrink