--------------------------------------------------------------------------------
-- |
-- Module    :  Database.EventStore.Internal.Manager.Registry
-- Copyright :  (C) 2020 Yorick Laupa
-- License   :  (see the file LICENSE)
-- Maintainer:  Yorick Laupa <yo.eight@gmail.com>
-- Stability :  experimental
-- Portability: non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Manager.Operation.Registry where

--------------------------------------------------------------------------------
import qualified Data.HashMap.Strict as HashMap

--------------------------------------------------------------------------------
import Data.ProtocolBuffers
import Data.Serialize

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
data Request =
  Request
  { Request -> Package
requestOriginal :: !Package
  , Request -> UUID
requestConnId :: !UUID
  , Request -> Int
requestRetries :: !Int
  , Request -> NominalDiffTime
requestStarted :: !NominalDiffTime
  , Request -> Mailbox
requestMailbox :: !Mailbox
  , Request -> Lifetime
requestLifetime :: !Lifetime
  }

--------------------------------------------------------------------------------
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive Request
req
  = case Request -> Lifetime
requestLifetime Request
req of
      Lifetime
OneTime -> Bool
False
      KeepAlive Command
_ -> Bool
True

--------------------------------------------------------------------------------
requestToWaiting :: Request -> Waiting
requestToWaiting :: Request -> Waiting
requestToWaiting Request
req =
  Waiting
  { waitingLifetime :: Lifetime
waitingLifetime = Request -> Lifetime
requestLifetime Request
req
  , waitingPkg :: Package
waitingPkg = Request -> Package
requestOriginal Request
req
  , waitingMailbox :: Mailbox
waitingMailbox = Request -> Mailbox
requestMailbox Request
req
  }

--------------------------------------------------------------------------------
waitingToRequest
  :: UUID -- Connection id.
  -> NominalDiffTime
  -> Waiting
  -> Request
waitingToRequest :: UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
started Waiting
w
  = Request
    { requestOriginal :: Package
requestOriginal = Waiting -> Package
waitingPkg Waiting
w
    , requestConnId :: UUID
requestConnId = UUID
connId
    , requestRetries :: Int
requestRetries = Int
1
    , requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
    , requestMailbox :: Mailbox
requestMailbox = Waiting -> Mailbox
waitingMailbox Waiting
w
    , requestLifetime :: Lifetime
requestLifetime = Waiting -> Lifetime
waitingLifetime Waiting
w
    }

--------------------------------------------------------------------------------
data Waiting =
  Waiting
  { Waiting -> Lifetime
waitingLifetime :: !Lifetime
  , Waiting -> Package
waitingPkg :: !Package
  , Waiting -> Mailbox
waitingMailbox :: !Mailbox
  }

--------------------------------------------------------------------------------
type Requests = HashMap UUID Request

--------------------------------------------------------------------------------
data Registry' =
  Registry'
  { Registry' -> Requests
registryRequests :: !Requests
  , Registry' -> [Waiting]
registryWaitings :: ![Waiting]
  , Registry' -> NominalDiffTime
registryTimeout :: !NominalDiffTime
  , Registry' -> Retry
registryMaxRetry :: !Retry
  }

--------------------------------------------------------------------------------
registryClear :: Registry' -> Registry'
registryClear :: Registry' -> Registry'
registryClear Registry'
reg =
  Registry'
reg
  { registryRequests :: Requests
registryRequests = forall a. Monoid a => a
mempty
  , registryWaitings :: [Waiting]
registryWaitings = []
  }

--------------------------------------------------------------------------------
data Registry =
  Registry
  { Registry -> IORef Registry'
registryState :: IORef Registry'
  , Registry -> Stopwatch
registryStopwatch :: Stopwatch
  }

--------------------------------------------------------------------------------
-- | I'm bad at naming thing however, we are going to use that datastructure
--  so we could lookup and delete in one single pass.
data Blob a b = Blob a b

--------------------------------------------------------------------------------
instance Functor (Blob a) where
  fmap :: forall a b. (a -> b) -> Blob a a -> Blob a b
fmap a -> b
f (Blob a
a a
b) = forall a b. a -> b -> Blob a b
Blob a
a (a -> b
f a
b)

--------------------------------------------------------------------------------
registryRemoveRequest
  :: UUID
  -> Registry'
  -> (Maybe Request, Registry')
registryRemoveRequest :: UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest UUID
key Registry'
reg
  = let Blob Maybe Request
result Requests
newMap
          = forall (f :: * -> *) k v.
(Functor f, Eq k, Hashable k) =>
(Maybe v -> f (Maybe v)) -> k -> HashMap k v -> f (HashMap k v)
HashMap.alterF forall {a} {a}. Maybe a -> Blob (Maybe a) (Maybe a)
go UUID
key (Registry' -> Requests
registryRequests Registry'
reg)
    in (Maybe Request
result, Registry'
reg { registryRequests :: Requests
registryRequests = Requests
newMap })
  where
    go :: Maybe a -> Blob (Maybe a) (Maybe a)
go Maybe a
Nothing = forall a b. a -> b -> Blob a b
Blob forall a. Maybe a
Nothing forall a. Maybe a
Nothing
    go (Just a
e) = forall a b. a -> b -> Blob a b
Blob (forall a. a -> Maybe a
Just a
e) forall a. Maybe a
Nothing

--------------------------------------------------------------------------------
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew NominalDiffTime
timeout Retry
maxRetry
  = IORef Registry' -> Stopwatch -> Registry
Registry
    forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Registry'
state
    forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *). MonadBase IO m => m Stopwatch
newStopwatch
  where
    state :: Registry'
state =
      Registry'
      { registryRequests :: Requests
registryRequests = forall a. Monoid a => a
mempty
      , registryWaitings :: [Waiting]
registryWaitings = []
      , registryTimeout :: NominalDiffTime
registryTimeout = NominalDiffTime
timeout
      , registryMaxRetry :: Retry
registryMaxRetry = Retry
maxRetry
      }

--------------------------------------------------------------------------------
registryRegister
  :: Registry
  -> UUID -- Connection Id.
  -> Lifetime
  -> Package
  -> Mailbox
  -> EventStore ()
registryRegister :: Registry -> UUID -> Lifetime -> Package -> Mailbox -> EventStore ()
registryRegister Registry
reg UUID
connId Lifetime
lifetime Package
pkg Mailbox
mailbox
  = do NominalDiffTime
started <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
       forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) (NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started)
  where
    go :: NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started Registry'
state =
      let req :: Request
req = Request
                { requestOriginal :: Package
requestOriginal = Package
pkg
                , requestConnId :: UUID
requestConnId = UUID
connId
                , requestRetries :: Int
requestRetries = Int
1
                , requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
                , requestMailbox :: Mailbox
requestMailbox = Mailbox
mailbox
                , requestLifetime :: Lifetime
requestLifetime = Lifetime
lifetime
                }

          correlation :: UUID
correlation = Package -> UUID
packageCorrelation Package
pkg
          nextReqs :: Requests
nextReqs = forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap UUID
correlation Request
req (Registry' -> Requests
registryRequests Registry'
state)
      in Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs }

--------------------------------------------------------------------------------
registryPostpone
  :: Registry
  -> Mailbox
  -> Lifetime
  -> Package
  -> EventStore ()
registryPostpone :: Registry -> Mailbox -> Lifetime -> Package -> EventStore ()
registryPostpone Registry
reg Mailbox
mailbox Lifetime
lifetime Package
pkg
  = forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) Registry' -> Registry'
go
  where
    go :: Registry' -> Registry'
go Registry'
state
      = let waiting :: Waiting
waiting
              = Waiting
                { waitingLifetime :: Lifetime
waitingLifetime = Lifetime
lifetime
                , waitingPkg :: Package
waitingPkg = Package
pkg
                , waitingMailbox :: Mailbox
waitingMailbox = Mailbox
mailbox
                }

            nextWs :: [Waiting]
nextWs = Waiting
waiting forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
state
        in Registry'
state { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }

--------------------------------------------------------------------------------
registryHandle
  :: Registry
  -> Package
  -> EventStore (Maybe NodeEndPoints)
registryHandle :: Registry -> Package -> EventStore (Maybe NodeEndPoints)
registryHandle Registry
reg Package
pkg
  = do Registry'
state <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
       case UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest (Package -> UUID
packageCorrelation Package
pkg) Registry'
state of
         (Maybe Request
Nothing, Registry'
_)
           -> do $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn [i|No operation associated to package: #{pkg}|]
                 forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
         (Just Request
req, Registry'
stateWithoutReq)
           -> case Package -> Command
packageCmd Package
pkg of
                Command
cmd | Command
cmd forall a. Eq a => a -> a -> Bool
== Command
badRequestCmd
                  -> do let reason :: Maybe Text
reason = Package -> Maybe Text
packageDataAsText Package
pkg
                        forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) (Maybe Text -> OperationError
ServerError Maybe Text
reason)
                        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

                    | Command
cmd forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd
                  -> do forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
NotAuthenticatedOp
                        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

                    | Command
cmd forall a. Eq a => a -> a -> Bool
== Command
notHandledCmd -- In all cases, we decide to postpone that command.
                  -> do $(logWarn) [i|Not handled response received: #{pkg}.|]
                        let Just NotHandledBuf
msg = forall a. Decode a => ByteString -> Maybe a
maybeDecodeMessage forall a b. (a -> b) -> a -> b
$ Package -> ByteString
packageData Package
pkg
                            reason :: FieldType (Required 1 (Enumeration NotHandledReason))
reason = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Required 1 (Enumeration NotHandledReason)
notHandledReason NotHandledBuf
msg
                            waiting :: Waiting
waiting = Request -> Waiting
requestToWaiting Request
req
                            nextWs :: [Waiting]
nextWs = Waiting
waiting forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
stateWithoutReq
                            finalState :: Registry'
finalState = Registry'
stateWithoutReq { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }
                            origCmd :: Command
origCmd = Package -> Command
packageCmd (Request -> Package
requestOriginal Request
req)
                            pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg

                        forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
finalState

                        case FieldType (Required 1 (Enumeration NotHandledReason))
reason of
                          FieldType (Required 1 (Enumeration NotHandledReason))
NotHandledReason
N_NotMaster
                            -> do let Just MasterInfoBuf
details = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Optional 2 (Message MasterInfoBuf)
notHandledAdditionalInfo NotHandledBuf
msg
                                      info :: MasterInfo
info = MasterInfoBuf -> MasterInfo
masterInfo MasterInfoBuf
details
                                      node :: NodeEndPoints
node = MasterInfo -> NodeEndPoints
masterInfoNodeEndPoints MasterInfo
info

                                  $(logWarn) [i|Received a non master error on command #{origCmd} [#{pkgId}] on #{node}|]
                                  forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just NodeEndPoints
node)

                          FieldType (Required 1 (Enumeration NotHandledReason))
_ -> do $(logWarn) [i|The server has either not started or is too busy. Retrying command #{origCmd} #{pkgId}|]
                                  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

                    | Bool
otherwise
                  -> do let respCmd :: Command
respCmd = Package -> Command
packageCmd Package
pkg

                        forall (m :: * -> *). MonadBase IO m => Mailbox -> Package -> m ()
mailboxSendPkg (Request -> Mailbox
requestMailbox Request
req) Package
pkg

                        case Request -> Lifetime
requestLifetime Request
req of
                          Lifetime
OneTime
                            -> do forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
                                  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

                          KeepAlive Command
endCmd
                            | Command
endCmd forall a. Eq a => a -> a -> Bool
== Command
respCmd
                            -> do forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
                                  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
                            | Bool
otherwise -- Means we keep the previous state (Subscription).
                            -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

--------------------------------------------------------------------------------
data CRState =
  CRState
  { CRState -> Registry'
crsState :: !Registry'
  , CRState -> [Package]
crsPkgs :: ![Package]
  }

--------------------------------------------------------------------------------
crsStateNew :: Registry' -> CRState
crsStateNew :: Registry' -> CRState
crsStateNew Registry'
reg =
  CRState
  { crsState :: Registry'
crsState = Registry'
reg
  , crsPkgs :: [Package]
crsPkgs = []
  }

--------------------------------------------------------------------------------
crsStateDeleteReq :: Request -> CRState -> CRState
crsStateDeleteReq :: Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
reg
  = let state :: Registry'
state
          = CRState -> Registry'
crsState CRState
reg
        nextReqs :: Requests
nextReqs
          = forall map. IsMap map => ContainerKey map -> map -> map
deleteMap
              (Package -> UUID
packageCorrelation forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal forall a b. (a -> b) -> a -> b
$ Request
req)
              (Registry' -> Requests
registryRequests Registry'
state)
        nextState :: Registry'
nextState
          = Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
    CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }

--------------------------------------------------------------------------------
crsStateRegisterReq :: Request -> CRState -> CRState
crsStateRegisterReq :: Request -> CRState -> CRState
crsStateRegisterReq Request
req CRState
reg
  = let state :: Registry'
state
          = CRState -> Registry'
crsState CRState
reg
        nextReqs :: Requests
nextReqs
          = forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap
              (Package -> UUID
packageCorrelation forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal forall a b. (a -> b) -> a -> b
$ Request
req)
              Request
req
              (Registry' -> Requests
registryRequests Registry'
state)
        nextState :: Registry'
nextState
          = Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
    CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }

--------------------------------------------------------------------------------
crsStateAddPkg :: Package -> CRState -> CRState
crsStateAddPkg :: Package -> CRState -> CRState
crsStateAddPkg Package
pkg CRState
reg
  = let nextPkgs :: [Package]
nextPkgs = Package
pkg forall a. a -> [a] -> [a]
: CRState -> [Package]
crsPkgs CRState
reg in
    CRState
reg { crsPkgs :: [Package]
crsPkgs = [Package]
nextPkgs }

--------------------------------------------------------------------------------
registryCheckAndRetry
  :: Registry
  -> UUID -- Connection id.
  -> EventStore [Package]
registryCheckAndRetry :: Registry -> UUID -> EventStore [Package]
registryCheckAndRetry Registry
reg UUID
connId
  = do Registry'
state <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
       NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
       let reqs :: [(ContainerKey Requests, MapValue Requests)]
reqs = forall map. IsMap map => map -> [(ContainerKey map, MapValue map)]
mapToList forall a b. (a -> b) -> a -> b
$ Registry' -> Requests
registryRequests Registry'
state

       CRState
newState <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed) (Registry' -> CRState
crsStateNew Registry'
state) [(ContainerKey Requests, MapValue Requests)]
reqs
       let newStateTemp :: Registry'
newStateTemp = CRState -> Registry'
crsState CRState
newState
           awaitings :: [Waiting]
awaitings = Registry' -> [Waiting]
registryWaitings Registry'
newStateTemp
           tempState :: Registry'
tempState = Registry'
newStateTemp { registryWaitings :: [Waiting]
registryWaitings = [] }
           newCRState :: CRState
newCRState = CRState
newState { crsState :: Registry'
crsState = Registry'
tempState }
           finalState :: CRState
finalState = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed) CRState
newCRState [Waiting]
awaitings

       forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (CRState -> Registry'
crsState CRState
finalState)
       forall (f :: * -> *) a. Applicative f => a -> f a
pure (CRState -> [Package]
crsPkgs CRState
finalState)
  where
    checking :: NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed CRState
state (UUID
_, Request
req)
      = do let maxTimeout :: NominalDiffTime
maxTimeout = Registry' -> NominalDiffTime
registryTimeout forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState forall a b. (a -> b) -> a -> b
$ CRState
state
               hasTimeout :: Bool
hasTimeout = NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- (Request -> NominalDiffTime
requestStarted Request
req) forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
maxTimeout
               maxRetry :: Retry
maxRetry = Registry' -> Retry
registryMaxRetry forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState forall a b. (a -> b) -> a -> b
$ CRState
state
           if Request -> UUID
requestConnId Request
req forall a. Eq a => a -> a -> Bool
/= UUID
connId
             then
               do forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
ConnectionHasDropped
                  forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
           else if Bool -> Bool
not (Request -> Bool
requestIsKeepAlive Request
req) Bool -> Bool -> Bool
&& Bool
hasTimeout
             then case Retry
maxRetry of
                    AtMost Int
maxAtt
                      | Request -> Int
requestRetries Request
req forall a. Num a => a -> a -> a
+ Int
1 forall a. Ord a => a -> a -> Bool
> Int
maxAtt
                      -> do let pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
                                pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
                                cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg

                            $(logError) [i|Command #{cmd} [#{pkgId}] maximum retries threshold reached (#{maxAtt}), aborted!|]
                            forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted
                            forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
                      | Bool
otherwise
                      -> EventStore CRState
retryReq
                    Retry
KeepRetrying
                      -> EventStore CRState
retryReq
           else
             forall (f :: * -> *) a. Applicative f => a -> f a
pure CRState
state
      where
        retryReq :: EventStore CRState
retryReq
          = do let nextRetries :: Int
nextRetries
                     = Request -> Int
requestRetries Request
req forall a. Num a => a -> a -> a
+ Int
1
                   nextReq :: Request
nextReq
                     = Request
req
                       { requestRetries :: Int
requestRetries = Int
nextRetries
                       , requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
elapsed
                       }

                   maxAtt :: Int
maxAtt
                     = case Registry' -> Retry
registryMaxRetry forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState forall a b. (a -> b) -> a -> b
$ CRState
state of
                         AtMost Int
n -> Int
n
                         Retry
KeepRetrying -> forall a. Bounded a => a
maxBound

                   pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
                   cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg
                   pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg

               $(logWarn) [i|Command #{cmd} [#{pkgId} has timeout. Retrying (attempt #{nextRetries}/#{maxAtt})|]

               forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> CRState -> CRState
crsStateRegisterReq Request
nextReq forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg forall a b. (a -> b) -> a -> b
$ CRState
state

    sending :: NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed CRState
state Waiting
w
      = let req :: Request
req = UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
elapsed Waiting
w
            pkg :: Package
pkg = Request -> Package
requestOriginal Request
req in
        Request -> CRState -> CRState
crsStateRegisterReq Request
req forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg forall a b. (a -> b) -> a -> b
$ CRState
state

--------------------------------------------------------------------------------
registryAbort :: Registry -> EventStore ()
registryAbort :: Registry -> EventStore ()
registryAbort Registry
reg
  = do Registry'
state <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
       forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (Registry' -> Registry'
registryClear Registry'
state)

       forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> Requests
registryRequests Registry'
state) forall a b. (a -> b) -> a -> b
$ \Request
req
         -> forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted

       forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> [Waiting]
registryWaitings Registry'
state) forall a b. (a -> b) -> a -> b
$ \Waiting
w
         -> forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Waiting -> Mailbox
waitingMailbox Waiting
w) OperationError
Aborted

--------------------------------------------------------------------------------
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage :: forall a. Decode a => ByteString -> Maybe a
maybeDecodeMessage ByteString
bytes =
    case forall a. Get a -> ByteString -> Either String a
runGet forall a. Decode a => Get a
decodeMessage ByteString
bytes of
        Right a
a -> forall a. a -> Maybe a
Just a
a
        Either String a
_       -> forall a. Maybe a
Nothing