{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -Wno-deferred-out-of-scope-variables #-}
module Hydra.Network.Etcd where
import Hydra.Prelude
import Cardano.Binary (decodeFull', serialize')
import Cardano.Crypto.Hash (SHA256, hashToStringAsHex, hashWithSerialiser)
import Control.Concurrent.Class.MonadSTM (
modifyTVar',
newTBQueueIO,
newTVarIO,
peekTBQueue,
readTBQueue,
swapTVar,
writeTBQueue,
writeTVar,
)
import Control.Exception (IOException)
import Control.Lens ((^.), (^..), (^?))
import Data.Aeson (decodeFileStrict', encodeFile)
import Data.Aeson qualified as Aeson
import Data.Aeson.Lens qualified as Aeson
import Data.Aeson.Types (Value)
import Data.Bits ((.|.))
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8
import Data.List ((\\))
import Data.List qualified as List
import Data.Map qualified as Map
import Data.Text qualified as T
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (
Connectivity (..),
Host (..),
Network (..),
NetworkCallback (..),
NetworkComponent,
NetworkConfiguration (..),
ProtocolVersion,
WhichEtcd (..),
)
import Hydra.Node.EmbedTH (embedExecutable)
import Network.GRPC.Client (
Address (..),
ConnParams (..),
Connection,
ReconnectPolicy (..),
ReconnectTo (ReconnectToOriginal),
Server (..),
rpc,
withConnection,
)
import Network.GRPC.Client.StreamType.IO (biDiStreaming, nonStreaming)
import Network.GRPC.Common (GrpcError (..), GrpcException (..), HTTP2Settings (..), NextElem (..), def, defaultHTTP2Settings)
import Network.GRPC.Common.NextElem (whileNext_)
import Network.GRPC.Common.Protobuf (Proto (..), Protobuf, defMessage, (.~))
import Network.GRPC.Etcd (
Compare'CompareResult (..),
Compare'CompareTarget (..),
KV,
Lease,
Watch,
)
import System.Directory (createDirectoryIfMissing, listDirectory, removeFile)
import System.Environment.Blank (getEnvironment)
import System.FilePath (takeDirectory, (</>))
import System.IO.Error (isDoesNotExistError)
import System.Posix (ownerExecuteMode, ownerReadMode, ownerWriteMode, setFileMode)
import System.Process (interruptProcessGroupOf)
import System.Process.Typed (
Process,
ProcessConfig,
createPipe,
getStderr,
proc,
setCreateGroup,
setEnv,
setStderr,
startProcess,
stopProcess,
unsafeProcessHandle,
waitExitCode,
)
import UnliftIO (readTVarIO)
withEtcdNetwork ::
(ToCBOR msg, FromCBOR msg, Eq msg) =>
Tracer IO EtcdLog ->
ProtocolVersion ->
NetworkConfiguration ->
NetworkComponent IO msg msg ()
withEtcdNetwork :: forall msg.
(ToCBOR msg, FromCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> ProtocolVersion
-> NetworkConfiguration
-> NetworkComponent IO msg msg ()
withEtcdNetwork Tracer IO EtcdLog
tracer ProtocolVersion
protocolVersion NetworkConfiguration
config NetworkCallback msg IO
callback Network IO msg -> IO ()
action = do
FilePath
etcdBinPath <- FilePath -> WhichEtcd -> IO FilePath
getEtcdBinary FilePath
persistenceDir WhichEtcd
whichEtcd
Map FilePath FilePath
envVars <- [(FilePath, FilePath)] -> Map FilePath FilePath
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(FilePath, FilePath)] -> Map FilePath FilePath)
-> IO [(FilePath, FilePath)] -> IO (Map FilePath FilePath)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO [(FilePath, FilePath)]
getEnvironment
ProcessConfig () () Handle
-> (Process () () Handle -> IO ()) -> IO ()
forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt (FilePath -> Map FilePath FilePath -> ProcessConfig () () Handle
etcdCmd FilePath
etcdBinPath Map FilePath FilePath
envVars) ((Process () () Handle -> IO ()) -> IO ())
-> (Process () () Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Process () () Handle
p -> do
IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process () () Handle
p IO ExitCode -> (ExitCode -> IO Any) -> IO Any
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ExitCode
ec -> FilePath -> IO Any
forall a. FilePath -> IO a
forall (m :: * -> *) a. MonadFail m => FilePath -> m a
fail (FilePath -> IO Any) -> FilePath -> IO Any
forall a b. (a -> b) -> a -> b
$ FilePath
"Sub-process etcd exited with: " FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> ExitCode -> FilePath
forall b a. (Show a, IsString b) => a -> b
show ExitCode
ec) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO Any -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Process () () Handle -> NetworkCallback msg IO -> IO Any
traceStderr Process () () Handle
p NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar Bool
doneVar <- Bool -> IO (TVar IO Bool)
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO Bool
False
ConnParams -> Server -> (Connection -> IO ()) -> IO ()
forall a. ConnParams -> Server -> (Connection -> IO a) -> IO a
withConnection (TVar Bool -> ConnParams
connParams TVar Bool
doneVar) Server
grpcServer ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
IO () -> (Async IO () -> IO ()) -> IO ()
forall a b. IO a -> (Async IO a -> IO b) -> IO b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync (Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
protocolVersion NetworkCallback msg IO
callback) ((Async IO () -> IO ()) -> IO ())
-> (Async IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async IO ()
_ -> do
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn FilePath
persistenceDir NetworkCallback msg IO
callback) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PersistentQueue IO msg
queue <- FilePath -> Natural -> IO (PersistentQueue IO msg)
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
FilePath -> Natural -> m (PersistentQueue m a)
newPersistentQueue (FilePath
persistenceDir FilePath -> FilePath -> FilePath
</> FilePath
"pending-broadcast") Natural
100
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_ (Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
advertise PersistentQueue IO msg
queue) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Network IO msg -> IO ()
action
Network
{ $sel:broadcast:Network :: msg -> IO ()
broadcast = PersistentQueue IO msg -> msg -> IO ()
forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue IO msg
queue
}
STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TVar IO Bool -> Bool -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar Bool
TVar IO Bool
doneVar Bool
True)
where
connParams :: TVar Bool -> ConnParams
connParams TVar Bool
doneVar =
ConnParams
forall a. Default a => a
def
{ connReconnectPolicy = reconnectPolicy doneVar
,
connHTTP2Settings = defaultHTTP2Settings{http2OverridePingRateLimit = Just maxBound}
}
reconnectPolicy :: TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar = ReconnectTo -> IO ReconnectPolicy -> ReconnectPolicy
ReconnectAfter ReconnectTo
ReconnectToOriginal (IO ReconnectPolicy -> ReconnectPolicy)
-> IO ReconnectPolicy -> ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ do
Bool
done <- TVar Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
doneVar
if Bool
done
then ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReconnectPolicy
DontReconnect
else do
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
Reconnecting
ReconnectPolicy -> IO ReconnectPolicy
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReconnectPolicy -> IO ReconnectPolicy)
-> ReconnectPolicy -> IO ReconnectPolicy
forall a b. (a -> b) -> a -> b
$ TVar Bool -> ReconnectPolicy
reconnectPolicy TVar Bool
doneVar
clientHost :: Host
clientHost = Host{$sel:hostname:Host :: Text
hostname = Text
"127.0.0.1", $sel:port:Host :: PortNumber
port = PortNumber
clientPort}
grpcServer :: Server
grpcServer =
Address -> Server
ServerInsecure (Address -> Server) -> Address -> Server
forall a b. (a -> b) -> a -> b
$
Address
{ addressHost :: FilePath
addressHost = Text -> FilePath
forall a. ToString a => a -> FilePath
toString (Text -> FilePath) -> Text -> FilePath
forall a b. (a -> b) -> a -> b
$ Host -> Text
hostname Host
clientHost
, addressPort :: PortNumber
addressPort = Host -> PortNumber
port Host
clientHost
, addressAuthority :: Maybe FilePath
addressAuthority = Maybe FilePath
forall a. Maybe a
Nothing
}
clientPort :: PortNumber
clientPort = PortNumber
2379 PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
+ Host -> PortNumber
port Host
listen PortNumber -> PortNumber -> PortNumber
forall a. Num a => a -> a -> a
- PortNumber
5001
traceStderr :: Process () () Handle -> NetworkCallback msg IO -> IO Any
traceStderr Process () () Handle
p NetworkCallback{Connectivity -> IO ()
onConnectivity :: Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity} =
IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
ByteString
bs <- Handle -> IO ByteString
BS.hGetLine (Process () () Handle -> Handle
forall stdin stdout stderr. Process stdin stdout stderr -> stderr
getStderr Process () () Handle
p)
case ByteString -> Either FilePath Value
forall a. FromJSON a => ByteString -> Either FilePath a
Aeson.eitherDecodeStrict ByteString
bs of
Left FilePath
err -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer FailedToDecodeLog{$sel:log:EtcdLog :: Text
log = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 ByteString
bs, $sel:reason:EtcdLog :: Text
reason = FilePath -> Text
forall b a. (Show a, IsString b) => a -> b
show FilePath
err}
Right Value
v -> do
let expectedClusterMismatch :: Maybe (Value, Value)
expectedClusterMismatch = do
Value
level' <- ByteString
bs ByteString -> Getting (First Value) ByteString Value -> Maybe Value
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' ByteString Value
forall t. AsValue t => Key -> Traversal' t Value
Aeson.key Key
"level" Getting (First Value) ByteString Value
-> ((Value -> Const (First Value) Value)
-> Value -> Const (First Value) Value)
-> Getting (First Value) ByteString Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Value -> Const (First Value) Value)
-> Value -> Const (First Value) Value
Prism' Value Value
Aeson.nonNull
Value
msg' <- ByteString
bs ByteString -> Getting (First Value) ByteString Value -> Maybe Value
forall s a. s -> Getting (First a) s a -> Maybe a
^? Key -> Traversal' ByteString Value
forall t. AsValue t => Key -> Traversal' t Value
Aeson.key Key
"msg" Getting (First Value) ByteString Value
-> ((Value -> Const (First Value) Value)
-> Value -> Const (First Value) Value)
-> Getting (First Value) ByteString Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Value -> Const (First Value) Value)
-> Value -> Const (First Value) Value
Prism' Value Value
Aeson.nonNull
(Value, Value) -> Maybe (Value, Value)
forall a. a -> Maybe a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Value
level', Value
msg')
case Maybe (Value, Value)
expectedClusterMismatch of
Just (Aeson.String Text
"error", Aeson.String Text
"request sent was ignored due to cluster ID mismatch") ->
Connectivity -> IO ()
onConnectivity ClusterIDMismatch{$sel:clusterPeers:PeerConnected :: Text
clusterPeers = FilePath -> Text
T.pack FilePath
clusterPeers}
Maybe (Value, Value)
_ -> Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ EtcdLog{etcd :: Value
etcd = Value
v}
etcdCmd :: FilePath -> Map FilePath FilePath -> ProcessConfig () () Handle
etcdCmd FilePath
etcdBinPath Map FilePath FilePath
envVars =
[(FilePath, FilePath)]
-> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
[(FilePath, FilePath)]
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setEnv (Map FilePath FilePath -> [(FilePath, FilePath)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map FilePath FilePath -> [(FilePath, FilePath)])
-> Map FilePath FilePath -> [(FilePath, FilePath)]
forall a b. (a -> b) -> a -> b
$ Map FilePath FilePath
envVars Map FilePath FilePath
-> Map FilePath FilePath -> Map FilePath FilePath
forall a. Semigroup a => a -> a -> a
<> Map FilePath FilePath
defaultEnv)
(ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([FilePath] -> ProcessConfig () () Handle)
-> [FilePath]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> ProcessConfig () () Handle -> ProcessConfig () () Handle
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True
(ProcessConfig () () Handle -> ProcessConfig () () Handle)
-> ([FilePath] -> ProcessConfig () () Handle)
-> [FilePath]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamSpec 'STOutput Handle
-> ProcessConfig () () () -> ProcessConfig () () Handle
forall stderr stdin stdout stderr0.
StreamSpec 'STOutput stderr
-> ProcessConfig stdin stdout stderr0
-> ProcessConfig stdin stdout stderr
setStderr StreamSpec 'STOutput Handle
forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
(ProcessConfig () () () -> ProcessConfig () () Handle)
-> ([FilePath] -> ProcessConfig () () ())
-> [FilePath]
-> ProcessConfig () () Handle
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> [FilePath] -> ProcessConfig () () ()
proc FilePath
etcdBinPath
([FilePath] -> ProcessConfig () () Handle)
-> [FilePath] -> ProcessConfig () () Handle
forall a b. (a -> b) -> a -> b
$ [[FilePath]] -> [FilePath]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[
[FilePath
"--name", Host -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Host
advertise]
, [FilePath
"--data-dir", FilePath
persistenceDir FilePath -> FilePath -> FilePath
</> FilePath
"etcd" FilePath -> FilePath -> FilePath
</> Hash SHA256 ByteString -> FilePath
forall h a. Hash h a -> FilePath
hashToStringAsHex (forall h a. HashAlgorithm h => (a -> Encoding) -> a -> Hash h a
hashWithSerialiser @SHA256 ByteString -> Encoding
forall a. ToCBOR a => a -> Encoding
toCBOR (ByteString -> Hash SHA256 ByteString)
-> ByteString -> Hash SHA256 ByteString
forall a b. (a -> b) -> a -> b
$ FilePath -> ByteString
BS8.pack FilePath
clusterPeers)]
, [FilePath
"--listen-peer-urls", Host -> FilePath
httpUrl Host
listen]
, [FilePath
"--initial-advertise-peer-urls", Host -> FilePath
httpUrl Host
advertise]
, [FilePath
"--listen-client-urls", Host -> FilePath
httpUrl Host
clientHost]
,
[FilePath
"--listen-client-http-urls", FilePath
"http://localhost:0"]
,
[FilePath
"--advertise-client-urls", Host -> FilePath
httpUrl Host
clientHost]
,
[FilePath
"--initial-cluster-token", FilePath
"hydra-network-1"]
, [FilePath
"--initial-cluster", FilePath
clusterPeers]
]
defaultEnv :: Map.Map String String
defaultEnv :: Map FilePath FilePath
defaultEnv =
[(FilePath, FilePath)] -> Map FilePath FilePath
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
[ (FilePath
"ETCD_AUTO_COMPACTION_MODE", FilePath
"revision")
, (FilePath
"ETCD_AUTO_COMPACTION_RETENTION", FilePath
"1000")
]
clusterPeers :: FilePath
clusterPeers =
FilePath -> [FilePath] -> FilePath
forall a. [a] -> [[a]] -> [a]
intercalate FilePath
","
([FilePath] -> FilePath)
-> ([Host] -> [FilePath]) -> [Host] -> FilePath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Host -> FilePath) -> [Host] -> [FilePath]
forall a b. (a -> b) -> [a] -> [b]
map (\Host
h -> Host -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Host
h FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
"=" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> Host -> FilePath
httpUrl Host
h)
([Host] -> FilePath) -> [Host] -> FilePath
forall a b. (a -> b) -> a -> b
$ (Host
advertise Host -> [Host] -> [Host]
forall a. a -> [a] -> [a]
: [Host]
peers)
httpUrl :: Host -> FilePath
httpUrl (Host Text
h PortNumber
p) = FilePath
"http://" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> Text -> FilePath
forall a. ToString a => a -> FilePath
toString Text
h FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
":" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> PortNumber -> FilePath
forall b a. (Show a, IsString b) => a -> b
show PortNumber
p
NetworkConfiguration{FilePath
persistenceDir :: FilePath
$sel:persistenceDir:NetworkConfiguration :: NetworkConfiguration -> FilePath
persistenceDir, Host
listen :: Host
$sel:listen:NetworkConfiguration :: NetworkConfiguration -> Host
listen, Host
advertise :: Host
$sel:advertise:NetworkConfiguration :: NetworkConfiguration -> Host
advertise, [Host]
peers :: [Host]
$sel:peers:NetworkConfiguration :: NetworkConfiguration -> [Host]
peers, WhichEtcd
whichEtcd :: WhichEtcd
$sel:whichEtcd:NetworkConfiguration :: NetworkConfiguration -> WhichEtcd
whichEtcd} = NetworkConfiguration
config
getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath
getEtcdBinary :: FilePath -> WhichEtcd -> IO FilePath
getEtcdBinary FilePath
_ WhichEtcd
SystemEtcd = FilePath -> IO FilePath
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
"etcd"
getEtcdBinary FilePath
persistenceDir WhichEtcd
EmbeddedEtcd =
let path :: FilePath
path = FilePath
persistenceDir FilePath -> FilePath -> FilePath
</> FilePath
"bin" FilePath -> FilePath -> FilePath
</> FilePath
"etcd"
in FilePath -> IO ()
installEtcd FilePath
path IO () -> IO FilePath -> IO FilePath
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> FilePath -> IO FilePath
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
path
installEtcd :: FilePath -> IO ()
installEtcd :: FilePath -> IO ()
installEtcd FilePath
fp = do
Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True (FilePath -> FilePath
takeDirectory FilePath
fp)
FilePath -> ByteString -> IO ()
BS.writeFile FilePath
fp $(embedExecutable "etcd")
FilePath -> FileMode -> IO ()
setFileMode FilePath
fp (FileMode
ownerReadMode FileMode -> FileMode -> FileMode
forall a. Bits a => a -> a -> a
.|. FileMode
ownerWriteMode FileMode -> FileMode -> FileMode
forall a. Bits a => a -> a -> a
.|. FileMode
ownerExecuteMode)
checkVersion ::
Tracer IO EtcdLog ->
Connection ->
ProtocolVersion ->
NetworkCallback msg IO ->
IO ()
checkVersion :: forall msg.
Tracer IO EtcdLog
-> Connection -> ProtocolVersion -> NetworkCallback msg IO -> IO ()
checkVersion Tracer IO EtcdLog
tracer Connection
conn ProtocolVersion
ourVersion NetworkCallback{Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity :: Connectivity -> IO ()
onConnectivity} = do
Proto TxnResponse
res <-
Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "txn")
-> Input (Protobuf KV "txn")
-> IO (Output (Protobuf KV "txn"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "txn")) (Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn")))
-> Input (Protobuf KV "txn") -> IO (Output (Protobuf KV "txn"))
forall a b. (a -> b) -> a -> b
$
Proto TxnRequest
forall msg. Message msg => msg
defMessage
Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto Compare]
[Proto Compare]
#compare ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto Compare]
[Proto Compare]
-> [Proto Compare] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto Compare
versionExists]
Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
#success ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
getVersion]
Proto TxnRequest
-> (Proto TxnRequest -> Proto TxnRequest) -> Proto TxnRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
#failure ASetter
(Proto TxnRequest)
(Proto TxnRequest)
[Proto RequestOp]
[Proto RequestOp]
-> [Proto RequestOp] -> Proto TxnRequest -> Proto TxnRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [Proto RequestOp
putVersion]
if Proto TxnResponse
res Proto TxnResponse -> Getting Bool (Proto TxnResponse) Bool -> Bool
forall s a. s -> Getting a s a -> a
^. Getting Bool (Proto TxnResponse) Bool
#succeeded
then [Proto KeyValue] -> (Proto KeyValue -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto TxnResponse
res Proto TxnResponse
-> Getting
(Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Proto TxnResponse
-> Const (Endo [Proto KeyValue]) (Proto TxnResponse)
#responses (([Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Proto TxnResponse
-> Const (Endo [Proto KeyValue]) (Proto TxnResponse))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> Getting
(Endo [Proto KeyValue]) (Proto TxnResponse) (Proto KeyValue)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ((Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp])
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto ResponseOp]
-> Const (Endo [Proto KeyValue]) [Proto ResponseOp]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
#responseRange ((Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto ResponseOp
-> Const (Endo [Proto KeyValue]) (Proto ResponseOp)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO ()) -> IO ())
-> (Proto KeyValue -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv ->
case ByteString -> Either DecoderError ProtocolVersion
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' (ByteString -> Either DecoderError ProtocolVersion)
-> ByteString -> Either DecoderError ProtocolVersion
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value of
Left DecoderError
err -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$
FailedToDecodeValue
{ $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
, $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
, $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
}
Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = Maybe ProtocolVersion
forall a. Maybe a
Nothing}
Right ProtocolVersion
theirVersion ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ProtocolVersion
theirVersion ProtocolVersion -> ProtocolVersion -> Bool
forall a. Eq a => a -> a -> Bool
== ProtocolVersion
ourVersion) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Connectivity -> IO ()
onConnectivity VersionMismatch{ProtocolVersion
ourVersion :: ProtocolVersion
$sel:ourVersion:PeerConnected :: ProtocolVersion
ourVersion, $sel:theirVersion:PeerConnected :: Maybe ProtocolVersion
theirVersion = ProtocolVersion -> Maybe ProtocolVersion
forall a. a -> Maybe a
Just ProtocolVersion
theirVersion}
else
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ MatchingProtocolVersion{version :: ProtocolVersion
version = ProtocolVersion
ourVersion}
where
versionKey :: ByteString
versionKey = ByteString
"version"
versionExists :: Proto Compare
versionExists =
Proto Compare
forall msg. Message msg => msg
defMessage
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareResult)
(Proto Compare'CompareResult)
#result ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareResult)
(Proto Compare'CompareResult)
-> Proto Compare'CompareResult -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareResult -> Proto Compare'CompareResult
forall msg. msg -> Proto msg
Proto Compare'CompareResult
Compare'GREATER
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareTarget)
(Proto Compare'CompareTarget)
#target ASetter
(Proto Compare)
(Proto Compare)
(Proto Compare'CompareTarget)
(Proto Compare'CompareTarget)
-> Proto Compare'CompareTarget -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Compare'CompareTarget -> Proto Compare'CompareTarget
forall msg. msg -> Proto msg
Proto Compare'CompareTarget
Compare'VERSION
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) ByteString ByteString
#key ASetter (Proto Compare) (Proto Compare) ByteString ByteString
-> ByteString -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
Proto Compare -> (Proto Compare -> Proto Compare) -> Proto Compare
forall a b. a -> (a -> b) -> b
& ASetter (Proto Compare) (Proto Compare) Int64 Int64
#version ASetter (Proto Compare) (Proto Compare) Int64 Int64
-> Int64 -> Proto Compare -> Proto Compare
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
0
getVersion :: Proto RequestOp
getVersion =
Proto RequestOp
forall msg. Message msg => msg
defMessage Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto RangeRequest)
(Proto RangeRequest)
#requestRange ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto RangeRequest)
(Proto RangeRequest)
-> Proto RangeRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ (Proto RangeRequest
forall msg. Message msg => msg
defMessage Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey)
putVersion :: Proto RequestOp
putVersion =
Proto RequestOp
forall msg. Message msg => msg
defMessage
Proto RequestOp
-> (Proto RequestOp -> Proto RequestOp) -> Proto RequestOp
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto PutRequest)
(Proto PutRequest)
#requestPut
ASetter
(Proto RequestOp)
(Proto RequestOp)
(Proto PutRequest)
(Proto PutRequest)
-> Proto PutRequest -> Proto RequestOp -> Proto RequestOp
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ( Proto PutRequest
forall msg. Message msg => msg
defMessage
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
versionKey
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ProtocolVersion -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' ProtocolVersion
ourVersion
)
broadcastMessages ::
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog ->
Connection ->
Host ->
PersistentQueue IO msg ->
IO ()
broadcastMessages :: forall msg.
(ToCBOR msg, Eq msg) =>
Tracer IO EtcdLog
-> Connection -> Host -> PersistentQueue IO msg -> IO ()
broadcastMessages Tracer IO EtcdLog
tracer Connection
conn Host
ourHost PersistentQueue IO msg
queue =
Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"broadcastMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
msg
msg <- PersistentQueue IO msg -> IO msg
forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue IO msg
queue
(Connection -> Host -> msg -> IO ()
forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PersistentQueue IO msg -> msg -> IO ()
forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue IO msg
queue msg
msg)
IO () -> (GrpcException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \case
GrpcException{GrpcError
grpcError :: GrpcError
grpcError :: GrpcException -> GrpcError
grpcError, Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage}
| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer (EtcdLog -> IO ()) -> EtcdLog -> IO ()
forall a b. (a -> b) -> a -> b
$ BroadcastFailed{$sel:reason:EtcdLog :: Text
reason = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
"unknown" Maybe Text
grpcErrorMessage}
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
GrpcException
e -> GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e
putMessage ::
ToCBOR msg =>
Connection ->
Host ->
msg ->
IO ()
putMessage :: forall msg. ToCBOR msg => Connection -> Host -> msg -> IO ()
putMessage Connection
conn Host
ourHost msg
msg =
IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> IO (Proto PutResponse) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) Input (Protobuf KV "put")
Proto PutRequest
req
where
req :: Proto PutRequest
req =
Proto PutRequest
forall msg. Message msg => msg
defMessage
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
key
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ msg -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' msg
msg
key :: ByteString
key = forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 @Text (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
"msg-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Host -> Text
forall b a. (Show a, IsString b) => a -> b
show Host
ourHost
waitMessages ::
FromCBOR msg =>
Tracer IO EtcdLog ->
Connection ->
FilePath ->
NetworkCallback msg IO ->
IO ()
waitMessages :: forall msg.
FromCBOR msg =>
Tracer IO EtcdLog
-> Connection -> FilePath -> NetworkCallback msg IO -> IO ()
waitMessages Tracer IO EtcdLog
tracer Connection
conn FilePath
directory NetworkCallback{msg -> IO ()
deliver :: msg -> IO ()
$sel:deliver:NetworkCallback :: forall msg (m :: * -> *). NetworkCallback msg m -> msg -> m ()
deliver} = do
Natural
revision <- FilePath -> IO Natural
forall (m :: * -> *). MonadIO m => FilePath -> m Natural
getLastKnownRevision FilePath
directory
Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"waitMessages" (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Connection
-> ClientHandler'
'BiDiStreaming (ReaderT Connection IO) (Protobuf Watch "watch")
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
-> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
-> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Watch "watch")) (((NextElem (Input (Protobuf Watch "watch")) -> IO ())
-> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ())
-> ((NextElem (Input (Protobuf Watch "watch")) -> IO ())
-> IO (NextElem (Output (Protobuf Watch "watch"))) -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Watch "watch")) -> IO ()
send IO (NextElem (Output (Protobuf Watch "watch")))
recv -> do
let watchRequest :: Proto WatchCreateRequest
watchRequest =
Proto WatchCreateRequest
forall msg. Message msg => msg
defMessage
Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
#key ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msg"
Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
#rangeEnd ASetter
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
ByteString
ByteString
-> ByteString
-> Proto WatchCreateRequest
-> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"msh"
Proto WatchCreateRequest
-> (Proto WatchCreateRequest -> Proto WatchCreateRequest)
-> Proto WatchCreateRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
#startRevision ASetter
(Proto WatchCreateRequest) (Proto WatchCreateRequest) Int64 Int64
-> Int64 -> Proto WatchCreateRequest -> Proto WatchCreateRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Natural -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural
revision Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
NextElem (Input (Protobuf Watch "watch")) -> IO ()
NextElem (Proto WatchRequest) -> IO ()
send (NextElem (Proto WatchRequest) -> IO ())
-> (Proto WatchRequest -> NextElem (Proto WatchRequest))
-> Proto WatchRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proto WatchRequest -> NextElem (Proto WatchRequest)
forall a. a -> NextElem a
NextElem (Proto WatchRequest -> IO ()) -> Proto WatchRequest -> IO ()
forall a b. (a -> b) -> a -> b
$ Proto WatchRequest
forall msg. Message msg => msg
defMessage Proto WatchRequest
-> (Proto WatchRequest -> Proto WatchRequest) -> Proto WatchRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto WatchRequest)
(Proto WatchRequest)
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
#createRequest ASetter
(Proto WatchRequest)
(Proto WatchRequest)
(Proto WatchCreateRequest)
(Proto WatchCreateRequest)
-> Proto WatchCreateRequest
-> Proto WatchRequest
-> Proto WatchRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Proto WatchCreateRequest
watchRequest
IO (NextElem (Proto WatchResponse))
-> (Proto WatchResponse -> IO ()) -> IO ()
forall (m :: * -> *) a.
Monad m =>
m (NextElem a) -> (a -> m ()) -> m ()
whileNext_ IO (NextElem (Output (Protobuf Watch "watch")))
IO (NextElem (Proto WatchResponse))
recv Proto WatchResponse -> IO ()
process
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
where
process :: Proto WatchResponse -> IO ()
process Proto WatchResponse
res = do
let revision :: Natural
revision = Int64 -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Natural) -> Int64 -> Natural
forall a b. (a -> b) -> a -> b
$ Proto WatchResponse
res Proto WatchResponse
-> Getting Int64 (Proto WatchResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. (Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Proto WatchResponse -> Const Int64 (Proto WatchResponse)
#header ((Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Proto WatchResponse -> Const Int64 (Proto WatchResponse))
-> ((Int64 -> Const Int64 Int64)
-> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader))
-> Getting Int64 (Proto WatchResponse) Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64 -> Const Int64 Int64)
-> Proto ResponseHeader -> Const Int64 (Proto ResponseHeader)
#revision
FilePath -> Natural -> IO ()
forall (m :: * -> *). MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision FilePath
directory Natural
revision
[Proto Event] -> (Proto Event -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proto WatchResponse
res Proto WatchResponse
-> Getting [Proto Event] (Proto WatchResponse) [Proto Event]
-> [Proto Event]
forall s a. s -> Getting a s a -> a
^. Getting [Proto Event] (Proto WatchResponse) [Proto Event]
#events) ((Proto Event -> IO ()) -> IO ())
-> (Proto Event -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Proto Event
event -> do
let value :: ByteString
value = Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#value
case ByteString -> Either DecoderError msg
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
Left DecoderError
err ->
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
Tracer IO EtcdLog
tracer
FailedToDecodeValue
{ $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto Event
event Proto Event
-> Getting ByteString (Proto Event) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. (Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event)
#kv ((Proto KeyValue -> Const ByteString (Proto KeyValue))
-> Proto Event -> Const ByteString (Proto Event))
-> Getting ByteString (Proto KeyValue) ByteString
-> Getting ByteString (Proto Event) ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Getting ByteString (Proto KeyValue) ByteString
#key
, $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
, $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
}
Right msg
msg -> msg -> IO ()
deliver msg
msg
getLastKnownRevision :: MonadIO m => FilePath -> m Natural
getLastKnownRevision :: forall (m :: * -> *). MonadIO m => FilePath -> m Natural
getLastKnownRevision FilePath
directory = do
IO Natural -> m Natural
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Natural -> m Natural) -> IO Natural -> m Natural
forall a b. (a -> b) -> a -> b
$
IO (Maybe Natural) -> IO (Either IOException (Maybe Natural))
forall e a. Exception e => IO a -> IO (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (FilePath -> IO (Maybe Natural)
forall a. FromJSON a => FilePath -> IO (Maybe a)
decodeFileStrict' (FilePath -> IO (Maybe Natural)) -> FilePath -> IO (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ FilePath
directory FilePath -> FilePath -> FilePath
</> FilePath
"last-known-revision") IO (Either IOException (Maybe Natural))
-> (Either IOException (Maybe Natural) -> IO Natural) -> IO Natural
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right Maybe Natural
rev -> do
Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> IO Natural) -> Natural -> IO Natural
forall a b. (a -> b) -> a -> b
$ Natural -> Maybe Natural -> Natural
forall a. a -> Maybe a -> a
fromMaybe Natural
1 Maybe Natural
rev
Left (IOException
e :: IOException)
| IOException -> Bool
isDoesNotExistError IOException
e -> Natural -> IO Natural
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
1
| Bool
otherwise -> do
FilePath -> IO Natural
forall a. FilePath -> IO a
forall (m :: * -> *) a. MonadFail m => FilePath -> m a
fail (FilePath -> IO Natural) -> FilePath -> IO Natural
forall a b. (a -> b) -> a -> b
$ FilePath
"Failed to load last known revision: " FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> IOException -> FilePath
forall b a. (Show a, IsString b) => a -> b
show IOException
e
putLastKnownRevision :: MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision :: forall (m :: * -> *). MonadIO m => FilePath -> Natural -> m ()
putLastKnownRevision FilePath
directory Natural
rev = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ FilePath -> Natural -> IO ()
forall a. ToJSON a => FilePath -> a -> IO ()
encodeFile (FilePath
directory FilePath -> FilePath -> FilePath
</> FilePath
"last-known-revision") Natural
rev
pollConnectivity ::
Tracer IO EtcdLog ->
Connection ->
Host ->
NetworkCallback msg IO ->
IO ()
pollConnectivity :: forall msg.
Tracer IO EtcdLog
-> Connection -> Host -> NetworkCallback msg IO -> IO ()
pollConnectivity Tracer IO EtcdLog
tracer Connection
conn Host
advertise NetworkCallback{Connectivity -> IO ()
$sel:onConnectivity:NetworkCallback :: forall msg (m :: * -> *).
NetworkCallback msg m -> Connectivity -> m ()
onConnectivity :: Connectivity -> IO ()
onConnectivity} = do
TVar [Host]
seenAliveVar <- [Host] -> IO (TVar IO [Host])
forall a. a -> IO (TVar IO a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO []
Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"pollConnectivity" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (GrpcException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int64
leaseId <- IO Int64
createLease
Connectivity -> IO ()
onConnectivity Connectivity
NetworkConnected
Int64 -> IO ()
writeAlive Int64
leaseId
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer CreatedLease{Int64
leaseId :: Int64
$sel:leaseId:EtcdLog :: Int64
leaseId}
Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId ((IO DiffTime -> IO Any) -> IO ())
-> (IO DiffTime -> IO Any) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO DiffTime
keepAlive ->
IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
DiffTime
ttlRemaining <- IO DiffTime
keepAlive
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DiffTime
ttlRemaining DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< DiffTime
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer LowLeaseTTL{DiffTime
ttlRemaining :: DiffTime
$sel:ttlRemaining:EtcdLog :: DiffTime
ttlRemaining}
[Host]
alive <- IO [Host]
getAlive
let othersAlive :: [Host]
othersAlive = [Host]
alive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host
advertise]
[Host]
seenAlive <- STM IO [Host] -> IO [Host]
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO [Host] -> IO [Host]) -> STM IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO [Host]
forall a. TVar IO a -> a -> STM IO a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m a
swapTVar TVar [Host]
TVar IO [Host]
seenAliveVar [Host]
othersAlive
[Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
othersAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
seenAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerConnected
[Host] -> (Host -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Host]
seenAlive [Host] -> [Host] -> [Host]
forall a. Eq a => [a] -> [a] -> [a]
\\ [Host]
othersAlive) ((Host -> IO ()) -> IO ()) -> (Host -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connectivity -> IO ()
onConnectivity (Connectivity -> IO ()) -> (Host -> Connectivity) -> Host -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Host -> Connectivity
PeerDisconnected
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
ttlRemaining DiffTime -> DiffTime -> DiffTime
forall a. Fractional a => a -> a -> a
/ DiffTime
2)
where
onGrpcException :: TVar [Host] -> GrpcException -> IO ()
onGrpcException TVar [Host]
seenAliveVar GrpcException{GrpcError
grpcError :: GrpcException -> GrpcError
grpcError :: GrpcError
grpcError}
| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcUnavailable Bool -> Bool -> Bool
|| GrpcError
grpcError GrpcError -> GrpcError -> Bool
forall a. Eq a => a -> a -> Bool
== GrpcError
GrpcDeadlineExceeded = do
Connectivity -> IO ()
onConnectivity Connectivity
NetworkDisconnected
STM IO () -> IO ()
forall a. HasCallStack => STM IO a -> IO a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO () -> IO ()) -> STM IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar IO [Host] -> [Host] -> STM IO ()
forall a. TVar IO a -> a -> STM IO ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar [Host]
TVar IO [Host]
seenAliveVar []
DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
onGrpcException TVar [Host]
_ GrpcException
e = GrpcException -> IO ()
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO GrpcException
e
createLease :: IO Int64
createLease = Text -> IO Int64 -> IO Int64
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"createLease" (IO Int64 -> IO Int64) -> IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ do
Proto LeaseGrantResponse
leaseResponse <-
Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf Lease "leaseGrant")
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseGrant")) (Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant")))
-> Input (Protobuf Lease "leaseGrant")
-> IO (Output (Protobuf Lease "leaseGrant"))
forall a b. (a -> b) -> a -> b
$
Proto LeaseGrantRequest
forall msg. Message msg => msg
defMessage Proto LeaseGrantRequest
-> (Proto LeaseGrantRequest -> Proto LeaseGrantRequest)
-> Proto LeaseGrantRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
#ttl ASetter
(Proto LeaseGrantRequest) (Proto LeaseGrantRequest) Int64 Int64
-> Int64 -> Proto LeaseGrantRequest -> Proto LeaseGrantRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
3
Int64 -> IO Int64
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int64 -> IO Int64) -> Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Proto LeaseGrantResponse
leaseResponse Proto LeaseGrantResponse
-> Getting Int64 (Proto LeaseGrantResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseGrantResponse) Int64
#id
withKeepAlive :: Int64 -> (IO DiffTime -> IO Any) -> IO ()
withKeepAlive Int64
leaseId IO DiffTime -> IO Any
action = do
Connection
-> ClientHandler'
'BiDiStreaming
(ReaderT Connection IO)
(Protobuf Lease "leaseKeepAlive")
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
-> IO ())
-> IO ()
forall {k} (rpc :: k) (m :: * -> *) r.
MonadIO m =>
Connection
-> ClientHandler' 'BiDiStreaming (ReaderT Connection m) rpc
-> ((NextElem (Input rpc) -> m ())
-> m (NextElem (Output rpc)) -> m r)
-> m r
biDiStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf Lease "leaseKeepAlive")) (((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
-> IO ())
-> IO ())
-> ((NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
-> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ \NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
recv -> do
IO Any -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Any -> IO ())
-> (IO DiffTime -> IO Any) -> IO DiffTime -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO DiffTime -> IO Any
action (IO DiffTime -> IO ()) -> IO DiffTime -> IO ()
forall a b. (a -> b) -> a -> b
$ do
NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
send (NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ())
-> NextElem (Input (Protobuf Lease "leaseKeepAlive")) -> IO ()
forall a b. (a -> b) -> a -> b
$ Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a. a -> NextElem a
NextElem (Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive")))
-> Input (Protobuf Lease "leaseKeepAlive")
-> NextElem (Input (Protobuf Lease "leaseKeepAlive"))
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveRequest
forall msg. Message msg => msg
defMessage Proto LeaseKeepAliveRequest
-> (Proto LeaseKeepAliveRequest -> Proto LeaseKeepAliveRequest)
-> Proto LeaseKeepAliveRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto LeaseKeepAliveRequest)
(Proto LeaseKeepAliveRequest)
Int64
Int64
#id ASetter
(Proto LeaseKeepAliveRequest)
(Proto LeaseKeepAliveRequest)
Int64
Int64
-> Int64
-> Proto LeaseKeepAliveRequest
-> Proto LeaseKeepAliveRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId
IO (NextElem (Output (Protobuf Lease "leaseKeepAlive")))
IO (NextElem (Proto LeaseKeepAliveResponse))
recv IO (NextElem (Proto LeaseKeepAliveResponse))
-> (NextElem (Proto LeaseKeepAliveResponse) -> IO DiffTime)
-> IO DiffTime
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
NextElem Proto LeaseKeepAliveResponse
res -> DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DiffTime -> IO DiffTime)
-> (Int64 -> DiffTime) -> Int64 -> IO DiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> IO DiffTime) -> Int64 -> IO DiffTime
forall a b. (a -> b) -> a -> b
$ Proto LeaseKeepAliveResponse
res Proto LeaseKeepAliveResponse
-> Getting Int64 (Proto LeaseKeepAliveResponse) Int64 -> Int64
forall s a. s -> Getting a s a -> a
^. Getting Int64 (Proto LeaseKeepAliveResponse) Int64
#ttl
NextElem (Proto LeaseKeepAliveResponse)
NoNextElem -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO EtcdLog
tracer EtcdLog
NoKeepAliveResponse
DiffTime -> IO DiffTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DiffTime
0
writeAlive :: Int64 -> IO ()
writeAlive Int64
leaseId = Text -> IO () -> IO ()
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"writeAlive" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (Proto PutResponse) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Proto PutResponse) -> IO ())
-> (Proto PutRequest -> IO (Proto PutResponse))
-> Proto PutRequest
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "put")
-> Input (Protobuf KV "put")
-> IO (Output (Protobuf KV "put"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "put")) (Proto PutRequest -> IO ()) -> Proto PutRequest -> IO ()
forall a b. (a -> b) -> a -> b
$
Proto PutRequest
forall msg. Message msg => msg
defMessage
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#key ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive-" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Host -> ByteString
forall b a. (Show a, IsString b) => a -> b
show Host
advertise
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
#value ASetter (Proto PutRequest) (Proto PutRequest) ByteString ByteString
-> ByteString -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Host -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' Host
advertise
Proto PutRequest
-> (Proto PutRequest -> Proto PutRequest) -> Proto PutRequest
forall a b. a -> (a -> b) -> b
& ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
#lease ASetter (Proto PutRequest) (Proto PutRequest) Int64 Int64
-> Int64 -> Proto PutRequest -> Proto PutRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Int64
leaseId
getAlive :: IO [Host]
getAlive = Text -> IO [Host] -> IO [Host]
forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
"getAlive" (IO [Host] -> IO [Host]) -> IO [Host] -> IO [Host]
forall a b. (a -> b) -> a -> b
$ do
Proto RangeResponse
res <-
Connection
-> ClientHandler'
'NonStreaming (ReaderT Connection IO) (Protobuf KV "range")
-> Input (Protobuf KV "range")
-> IO (Output (Protobuf KV "range"))
forall {k} (rpc :: k) (m :: * -> *).
Connection
-> ClientHandler' 'NonStreaming (ReaderT Connection m) rpc
-> Input rpc
-> m (Output rpc)
nonStreaming Connection
conn (forall {k} (rpc :: k) (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
forall rpc (styp :: StreamingType) (m :: * -> *).
(CanCallRPC m, SupportsClientRpc rpc,
SupportsStreamingType rpc styp, Default (RequestMetadata rpc)) =>
ClientHandler' styp m rpc
rpc @(Protobuf KV "range")) (Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range")))
-> Input (Protobuf KV "range") -> IO (Output (Protobuf KV "range"))
forall a b. (a -> b) -> a -> b
$
Proto RangeRequest
forall msg. Message msg => msg
defMessage
Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#key ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alive"
Proto RangeRequest
-> (Proto RangeRequest -> Proto RangeRequest) -> Proto RangeRequest
forall a b. a -> (a -> b) -> b
& ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
#rangeEnd ASetter
(Proto RangeRequest) (Proto RangeRequest) ByteString ByteString
-> ByteString -> Proto RangeRequest -> Proto RangeRequest
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ByteString
"alivf"
((Proto KeyValue -> IO (Maybe Host))
-> [Proto KeyValue] -> IO [Host])
-> [Proto KeyValue]
-> (Proto KeyValue -> IO (Maybe Host))
-> IO [Host]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Proto KeyValue -> IO (Maybe Host))
-> [Proto KeyValue] -> IO [Host]
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> [a] -> m [b]
mapMaybeM (Proto RangeResponse
res Proto RangeResponse
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> [Proto KeyValue]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. ([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
#kvs (([Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse))
-> ((Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue])
-> (Proto KeyValue
-> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> Proto RangeResponse
-> Const (Endo [Proto KeyValue]) (Proto RangeResponse)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Proto KeyValue -> Const (Endo [Proto KeyValue]) (Proto KeyValue))
-> [Proto KeyValue]
-> Const (Endo [Proto KeyValue]) [Proto KeyValue]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse) ((Proto KeyValue -> IO (Maybe Host)) -> IO [Host])
-> (Proto KeyValue -> IO (Maybe Host)) -> IO [Host]
forall a b. (a -> b) -> a -> b
$ \Proto KeyValue
kv -> do
let value :: ByteString
value = Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#value
case ByteString -> Either DecoderError Host
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
value of
Left DecoderError
err -> do
Tracer IO EtcdLog -> EtcdLog -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith
Tracer IO EtcdLog
tracer
FailedToDecodeValue
{ $sel:key:EtcdLog :: Text
key = ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Proto KeyValue
kv Proto KeyValue
-> Getting ByteString (Proto KeyValue) ByteString -> ByteString
forall s a. s -> Getting a s a -> a
^. Getting ByteString (Proto KeyValue) ByteString
#key
, $sel:value:EtcdLog :: Text
value = ByteString -> Text
encodeBase16 ByteString
value
, $sel:reason:EtcdLog :: Text
reason = DecoderError -> Text
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
}
Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Host
forall a. Maybe a
Nothing
Right Host
x -> Maybe Host -> IO (Maybe Host)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Host -> IO (Maybe Host)) -> Maybe Host -> IO (Maybe Host)
forall a b. (a -> b) -> a -> b
$ Host -> Maybe Host
forall a. a -> Maybe a
Just Host
x
withGrpcContext :: MonadCatch m => Text -> m a -> m a
withGrpcContext :: forall (m :: * -> *) a. MonadCatch m => Text -> m a -> m a
withGrpcContext Text
context m a
action =
m a
action m a -> (GrpcException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \e :: GrpcException
e@GrpcException{Maybe Text
grpcErrorMessage :: GrpcException -> Maybe Text
grpcErrorMessage :: Maybe Text
grpcErrorMessage} ->
GrpcException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO
GrpcException
e
{ grpcErrorMessage =
case grpcErrorMessage of
Maybe Text
Nothing -> Text -> Maybe Text
forall a. a -> Maybe a
Just Text
context
Just Text
msg -> Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Text
context Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
}
withProcessInterrupt ::
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr ->
(Process stdin stdout stderr -> m a) ->
m a
withProcessInterrupt :: forall (m :: * -> *) stdin stdout stderr a.
(MonadIO m, MonadThrow m) =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
withProcessInterrupt ProcessConfig stdin stdout stderr
config =
m (Process stdin stdout stderr)
-> (Process stdin stdout stderr -> m ())
-> (Process stdin stdout stderr -> m a)
-> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
startProcess (ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr))
-> ProcessConfig stdin stdout stderr
-> m (Process stdin stdout stderr)
forall a b. (a -> b) -> a -> b
$ ProcessConfig stdin stdout stderr
config ProcessConfig stdin stdout stderr
-> (ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr)
-> ProcessConfig stdin stdout stderr
forall a b. a -> (a -> b) -> b
& Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
forall stdin stdout stderr.
Bool
-> ProcessConfig stdin stdout stderr
-> ProcessConfig stdin stdout stderr
setCreateGroup Bool
True) Process stdin stdout stderr -> m ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
signalAndStopProcess
where
signalAndStopProcess :: Process stdin stdout stderr -> m ()
signalAndStopProcess Process stdin stdout stderr
p = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ProcessHandle -> IO ()
interruptProcessGroupOf (Process stdin stdout stderr -> ProcessHandle
forall stdin stdout stderr.
Process stdin stdout stderr -> ProcessHandle
unsafeProcessHandle Process stdin stdout stderr
p)
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m ()
race_
(IO ExitCode -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ExitCode -> IO ()) -> IO ExitCode -> IO ()
forall a b. (a -> b) -> a -> b
$ Process stdin stdout stderr -> IO ExitCode
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ExitCode
waitExitCode Process stdin stdout stderr
p)
(DiffTime -> IO ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
5 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process stdin stdout stderr -> IO ()
forall {m :: * -> *} {stdin} {stdout} {stderr}.
MonadIO m =>
Process stdin stdout stderr -> m ()
stopProcess Process stdin stdout stderr
p)
data PersistentQueue m a = PersistentQueue
{ forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
, forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
, forall (m :: * -> *) a. PersistentQueue m a -> FilePath
directory :: FilePath
}
newPersistentQueue ::
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
FilePath ->
Natural ->
m (PersistentQueue m a)
newPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
FilePath -> Natural -> m (PersistentQueue m a)
newPersistentQueue FilePath
path Natural
capacity = do
TBQueue m (Natural, a)
queue <- Natural -> m (TBQueue m (Natural, a))
forall a. Natural -> m (TBQueue m a)
forall (m :: * -> *) a. MonadSTM m => Natural -> m (TBQueue m a)
newTBQueueIO Natural
capacity
Natural
highestId <-
m Natural -> m (Either IOException Natural)
forall e a. Exception e => m a -> m (Either e a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue) m (Either IOException Natural)
-> (Either IOException Natural -> m Natural) -> m Natural
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left (IOException
_ :: IOException) -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
path
Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
Right Natural
highest -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
highest
TVar m Natural
nextIx <- Natural -> m (TVar m Natural)
forall a. a -> m (TVar m a)
forall (m :: * -> *) a. MonadSTM m => a -> m (TVar m a)
newTVarIO (Natural -> m (TVar m Natural)) -> Natural -> m (TVar m Natural)
forall a b. (a -> b) -> a -> b
$ Natural
highestId Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
PersistentQueue m a -> m (PersistentQueue m a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: TVar m Natural
nextIx :: TVar m Natural
nextIx, $sel:directory:PersistentQueue :: FilePath
directory = FilePath
path}
where
loadExisting :: TBQueue m (Natural, a) -> m Natural
loadExisting TBQueue m (Natural, a)
queue = do
[FilePath]
paths <- IO [FilePath] -> m [FilePath]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilePath] -> m [FilePath]) -> IO [FilePath] -> m [FilePath]
forall a b. (a -> b) -> a -> b
$ FilePath -> IO [FilePath]
listDirectory FilePath
path
case [Natural] -> [Natural]
forall a. Ord a => [a] -> [a]
sort ([Natural] -> [Natural]) -> [Natural] -> [Natural]
forall a b. (a -> b) -> a -> b
$ (FilePath -> Maybe Natural) -> [FilePath] -> [Natural]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe FilePath -> Maybe Natural
forall a. Read a => FilePath -> Maybe a
readMaybe [FilePath]
paths of
[] -> Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
0
[Natural]
idxs -> do
[Natural] -> (Natural -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Natural]
idxs ((Natural -> m ()) -> m ()) -> (Natural -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(Natural
idx :: Natural) -> do
ByteString
bs <- FilePath -> m ByteString
forall (m :: * -> *). MonadIO m => FilePath -> m ByteString
readFileBS (FilePath
path FilePath -> FilePath -> FilePath
</> Natural -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Natural
idx)
case ByteString -> Either DecoderError a
forall a. FromCBOR a => ByteString -> Either DecoderError a
decodeFull' ByteString
bs of
Left DecoderError
err ->
FilePath -> m ()
forall a. FilePath -> m a
forall (m :: * -> *) a. MonadFail m => FilePath -> m a
fail (FilePath -> m ()) -> FilePath -> m ()
forall a b. (a -> b) -> a -> b
$ FilePath
"Failed to decode item: " FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> DecoderError -> FilePath
forall b a. (Show a, IsString b) => a -> b
show DecoderError
err
Right a
item ->
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
idx, a
item)
Natural -> m Natural
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Natural -> m Natural) -> Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ [Natural] -> Natural
forall a. HasCallStack => [a] -> a
List.last [Natural]
idxs
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
writePersistentQueue :: forall a (m :: * -> *).
(ToCBOR a, MonadSTM m, MonadIO m) =>
PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, TVar m Natural
$sel:nextIx:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> TVar m Natural
nextIx :: TVar m Natural
nextIx, FilePath
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> FilePath
directory :: FilePath
directory} a
item = do
Natural
next <- STM m Natural -> m Natural
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Natural -> m Natural) -> STM m Natural -> m Natural
forall a b. (a -> b) -> a -> b
$ do
Natural
next <- TVar m Natural -> STM m Natural
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m Natural
nextIx
TVar m Natural -> (Natural -> Natural) -> STM m ()
forall a. TVar m a -> (a -> a) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
TVar m a -> (a -> a) -> STM m ()
modifyTVar' TVar m Natural
nextIx (Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1)
Natural -> STM m Natural
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Natural
next
FilePath -> ByteString -> m ()
forall (m :: * -> *). MonadIO m => FilePath -> ByteString -> m ()
writeFileBS (FilePath
directory FilePath -> FilePath -> FilePath
</> Natural -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Natural
next) (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToCBOR a => a -> ByteString
serialize' a
item
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue m (Natural, a) -> (Natural, a) -> STM m ()
forall a. TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m (Natural, a)
queue (Natural
next, a
item)
peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue :: forall (m :: * -> *) a. MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue} = do
(Natural, a) -> a
forall a b. (a, b) -> b
snd ((Natural, a) -> a) -> m (Natural, a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (Natural, a) -> m (Natural, a)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue)
popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m ()
popPersistentQueue :: forall (m :: * -> *) a.
(MonadSTM m, MonadIO m, Eq a) =>
PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue{TBQueue m (Natural, a)
$sel:queue:PersistentQueue :: forall (m :: * -> *) a.
PersistentQueue m a -> TBQueue m (Natural, a)
queue :: TBQueue m (Natural, a)
queue, FilePath
$sel:directory:PersistentQueue :: forall (m :: * -> *) a. PersistentQueue m a -> FilePath
directory :: FilePath
directory} a
item = do
Maybe Natural
popped <- STM m (Maybe Natural) -> m (Maybe Natural)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe Natural) -> m (Maybe Natural))
-> STM m (Maybe Natural) -> m (Maybe Natural)
forall a b. (a -> b) -> a -> b
$ do
(Natural
ix, a
next) <- TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
peekTBQueue TBQueue m (Natural, a)
queue
if a
next a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
item
then TBQueue m (Natural, a) -> STM m (Natural, a)
forall a. TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue TBQueue m (Natural, a)
queue STM m (Natural, a) -> Maybe Natural -> STM m (Maybe Natural)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Natural -> Maybe Natural
forall a. a -> Maybe a
Just Natural
ix
else Maybe Natural -> STM m (Maybe Natural)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Natural
forall a. Maybe a
Nothing
case Maybe Natural
popped of
Maybe Natural
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Natural
index -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (FilePath -> IO ()) -> FilePath -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> IO ()
removeFile (FilePath -> m ()) -> FilePath -> m ()
forall a b. (a -> b) -> a -> b
$ FilePath
directory FilePath -> FilePath -> FilePath
</> Natural -> FilePath
forall b a. (Show a, IsString b) => a -> b
show Natural
index
data EtcdLog
= EtcdLog {EtcdLog -> Value
etcd :: Value}
| Reconnecting
| BroadcastFailed {EtcdLog -> Text
reason :: Text}
| FailedToDecodeLog {EtcdLog -> Text
log :: Text, reason :: Text}
| FailedToDecodeValue {EtcdLog -> Text
key :: Text, EtcdLog -> Text
value :: Text, reason :: Text}
| CreatedLease {EtcdLog -> Int64
leaseId :: Int64}
| LowLeaseTTL {EtcdLog -> DiffTime
ttlRemaining :: DiffTime}
| NoKeepAliveResponse
| MatchingProtocolVersion {EtcdLog -> ProtocolVersion
version :: ProtocolVersion}
deriving stock (EtcdLog -> EtcdLog -> Bool
(EtcdLog -> EtcdLog -> Bool)
-> (EtcdLog -> EtcdLog -> Bool) -> Eq EtcdLog
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: EtcdLog -> EtcdLog -> Bool
== :: EtcdLog -> EtcdLog -> Bool
$c/= :: EtcdLog -> EtcdLog -> Bool
/= :: EtcdLog -> EtcdLog -> Bool
Eq, Int -> EtcdLog -> FilePath -> FilePath
[EtcdLog] -> FilePath -> FilePath
EtcdLog -> FilePath
(Int -> EtcdLog -> FilePath -> FilePath)
-> (EtcdLog -> FilePath)
-> ([EtcdLog] -> FilePath -> FilePath)
-> Show EtcdLog
forall a.
(Int -> a -> FilePath -> FilePath)
-> (a -> FilePath) -> ([a] -> FilePath -> FilePath) -> Show a
$cshowsPrec :: Int -> EtcdLog -> FilePath -> FilePath
showsPrec :: Int -> EtcdLog -> FilePath -> FilePath
$cshow :: EtcdLog -> FilePath
show :: EtcdLog -> FilePath
$cshowList :: [EtcdLog] -> FilePath -> FilePath
showList :: [EtcdLog] -> FilePath -> FilePath
Show, (forall x. EtcdLog -> Rep EtcdLog x)
-> (forall x. Rep EtcdLog x -> EtcdLog) -> Generic EtcdLog
forall x. Rep EtcdLog x -> EtcdLog
forall x. EtcdLog -> Rep EtcdLog x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. EtcdLog -> Rep EtcdLog x
from :: forall x. EtcdLog -> Rep EtcdLog x
$cto :: forall x. Rep EtcdLog x -> EtcdLog
to :: forall x. Rep EtcdLog x -> EtcdLog
Generic)
deriving anyclass ([EtcdLog] -> Value
[EtcdLog] -> Encoding
EtcdLog -> Bool
EtcdLog -> Value
EtcdLog -> Encoding
(EtcdLog -> Value)
-> (EtcdLog -> Encoding)
-> ([EtcdLog] -> Value)
-> ([EtcdLog] -> Encoding)
-> (EtcdLog -> Bool)
-> ToJSON EtcdLog
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: EtcdLog -> Value
toJSON :: EtcdLog -> Value
$ctoEncoding :: EtcdLog -> Encoding
toEncoding :: EtcdLog -> Encoding
$ctoJSONList :: [EtcdLog] -> Value
toJSONList :: [EtcdLog] -> Value
$ctoEncodingList :: [EtcdLog] -> Encoding
toEncodingList :: [EtcdLog] -> Encoding
$comitField :: EtcdLog -> Bool
omitField :: EtcdLog -> Bool
ToJSON)