module Database.EventStore.Internal.Manager.Operation.Registry where
import qualified Data.HashMap.Strict as HashMap
import Data.ProtocolBuffers
import Data.Serialize
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types
data Request =
Request
{ Request -> Package
requestOriginal :: !Package
, Request -> UUID
requestConnId :: !UUID
, Request -> Int
requestRetries :: !Int
, Request -> NominalDiffTime
requestStarted :: !NominalDiffTime
, Request -> Mailbox
requestMailbox :: !Mailbox
, Request -> Lifetime
requestLifetime :: !Lifetime
}
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive Request
req
= case Request -> Lifetime
requestLifetime Request
req of
Lifetime
OneTime -> Bool
False
KeepAlive Command
_ -> Bool
True
requestToWaiting :: Request -> Waiting
requestToWaiting :: Request -> Waiting
requestToWaiting Request
req =
Waiting
{ waitingLifetime :: Lifetime
waitingLifetime = Request -> Lifetime
requestLifetime Request
req
, waitingPkg :: Package
waitingPkg = Request -> Package
requestOriginal Request
req
, waitingMailbox :: Mailbox
waitingMailbox = Request -> Mailbox
requestMailbox Request
req
}
waitingToRequest
:: UUID
-> NominalDiffTime
-> Waiting
-> Request
waitingToRequest :: UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
started Waiting
w
= Request
{ requestOriginal :: Package
requestOriginal = Waiting -> Package
waitingPkg Waiting
w
, requestConnId :: UUID
requestConnId = UUID
connId
, requestRetries :: Int
requestRetries = Int
1
, requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
, requestMailbox :: Mailbox
requestMailbox = Waiting -> Mailbox
waitingMailbox Waiting
w
, requestLifetime :: Lifetime
requestLifetime = Waiting -> Lifetime
waitingLifetime Waiting
w
}
data Waiting =
Waiting
{ Waiting -> Lifetime
waitingLifetime :: !Lifetime
, Waiting -> Package
waitingPkg :: !Package
, Waiting -> Mailbox
waitingMailbox :: !Mailbox
}
type Requests = HashMap UUID Request
data Registry' =
Registry'
{ Registry' -> Requests
registryRequests :: !Requests
, Registry' -> [Waiting]
registryWaitings :: ![Waiting]
, Registry' -> NominalDiffTime
registryTimeout :: !NominalDiffTime
, Registry' -> Retry
registryMaxRetry :: !Retry
}
registryClear :: Registry' -> Registry'
registryClear :: Registry' -> Registry'
registryClear Registry'
reg =
Registry'
reg
{ registryRequests :: Requests
registryRequests = forall a. Monoid a => a
mempty
, registryWaitings :: [Waiting]
registryWaitings = []
}
data Registry =
Registry
{ Registry -> IORef Registry'
registryState :: IORef Registry'
, Registry -> Stopwatch
registryStopwatch :: Stopwatch
}
data Blob a b = Blob a b
instance Functor (Blob a) where
fmap :: forall a b. (a -> b) -> Blob a a -> Blob a b
fmap a -> b
f (Blob a
a a
b) = forall a b. a -> b -> Blob a b
Blob a
a (a -> b
f a
b)
registryRemoveRequest
:: UUID
-> Registry'
-> (Maybe Request, Registry')
registryRemoveRequest :: UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest UUID
key Registry'
reg
= let Blob Maybe Request
result Requests
newMap
= forall (f :: * -> *) k v.
(Functor f, Eq k, Hashable k) =>
(Maybe v -> f (Maybe v)) -> k -> HashMap k v -> f (HashMap k v)
HashMap.alterF forall {a} {a}. Maybe a -> Blob (Maybe a) (Maybe a)
go UUID
key (Registry' -> Requests
registryRequests Registry'
reg)
in (Maybe Request
result, Registry'
reg { registryRequests :: Requests
registryRequests = Requests
newMap })
where
go :: Maybe a -> Blob (Maybe a) (Maybe a)
go Maybe a
Nothing = forall a b. a -> b -> Blob a b
Blob forall a. Maybe a
Nothing forall a. Maybe a
Nothing
go (Just a
e) = forall a b. a -> b -> Blob a b
Blob (forall a. a -> Maybe a
Just a
e) forall a. Maybe a
Nothing
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew NominalDiffTime
timeout Retry
maxRetry
= IORef Registry' -> Stopwatch -> Registry
Registry
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Registry'
state
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *). MonadBase IO m => m Stopwatch
newStopwatch
where
state :: Registry'
state =
Registry'
{ registryRequests :: Requests
registryRequests = forall a. Monoid a => a
mempty
, registryWaitings :: [Waiting]
registryWaitings = []
, registryTimeout :: NominalDiffTime
registryTimeout = NominalDiffTime
timeout
, registryMaxRetry :: Retry
registryMaxRetry = Retry
maxRetry
}
registryRegister
:: Registry
-> UUID
-> Lifetime
-> Package
-> Mailbox
-> EventStore ()
registryRegister :: Registry -> UUID -> Lifetime -> Package -> Mailbox -> EventStore ()
registryRegister Registry
reg UUID
connId Lifetime
lifetime Package
pkg Mailbox
mailbox
= do NominalDiffTime
started <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) (NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started)
where
go :: NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started Registry'
state =
let req :: Request
req = Request
{ requestOriginal :: Package
requestOriginal = Package
pkg
, requestConnId :: UUID
requestConnId = UUID
connId
, requestRetries :: Int
requestRetries = Int
1
, requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
, requestMailbox :: Mailbox
requestMailbox = Mailbox
mailbox
, requestLifetime :: Lifetime
requestLifetime = Lifetime
lifetime
}
correlation :: UUID
correlation = Package -> UUID
packageCorrelation Package
pkg
nextReqs :: Requests
nextReqs = forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap UUID
correlation Request
req (Registry' -> Requests
registryRequests Registry'
state)
in Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs }
registryPostpone
:: Registry
-> Mailbox
-> Lifetime
-> Package
-> EventStore ()
registryPostpone :: Registry -> Mailbox -> Lifetime -> Package -> EventStore ()
registryPostpone Registry
reg Mailbox
mailbox Lifetime
lifetime Package
pkg
= forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) Registry' -> Registry'
go
where
go :: Registry' -> Registry'
go Registry'
state
= let waiting :: Waiting
waiting
= Waiting
{ waitingLifetime :: Lifetime
waitingLifetime = Lifetime
lifetime
, waitingPkg :: Package
waitingPkg = Package
pkg
, waitingMailbox :: Mailbox
waitingMailbox = Mailbox
mailbox
}
nextWs :: [Waiting]
nextWs = Waiting
waiting forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
state
in Registry'
state { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }
registryHandle
:: Registry
-> Package
-> EventStore (Maybe NodeEndPoints)
registryHandle :: Registry -> Package -> EventStore (Maybe NodeEndPoints)
registryHandle Registry
reg Package
pkg
= do Registry'
state <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
case UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest (Package -> UUID
packageCorrelation Package
pkg) Registry'
state of
(Maybe Request
Nothing, Registry'
_)
-> do $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn [i|No operation associated to package: #{pkg}|]
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
(Just Request
req, Registry'
stateWithoutReq)
-> case Package -> Command
packageCmd Package
pkg of
Command
cmd | Command
cmd forall a. Eq a => a -> a -> Bool
== Command
badRequestCmd
-> do let reason :: Maybe Text
reason = Package -> Maybe Text
packageDataAsText Package
pkg
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) (Maybe Text -> OperationError
ServerError Maybe Text
reason)
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
| Command
cmd forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd
-> do forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
NotAuthenticatedOp
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
| Command
cmd forall a. Eq a => a -> a -> Bool
== Command
notHandledCmd
-> do $(logWarn) [i|Not handled response received: #{pkg}.|]
let Just NotHandledBuf
msg = forall a. Decode a => ByteString -> Maybe a
maybeDecodeMessage forall a b. (a -> b) -> a -> b
$ Package -> ByteString
packageData Package
pkg
reason :: FieldType (Required 1 (Enumeration NotHandledReason))
reason = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Required 1 (Enumeration NotHandledReason)
notHandledReason NotHandledBuf
msg
waiting :: Waiting
waiting = Request -> Waiting
requestToWaiting Request
req
nextWs :: [Waiting]
nextWs = Waiting
waiting forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
stateWithoutReq
finalState :: Registry'
finalState = Registry'
stateWithoutReq { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }
origCmd :: Command
origCmd = Package -> Command
packageCmd (Request -> Package
requestOriginal Request
req)
pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
finalState
case FieldType (Required 1 (Enumeration NotHandledReason))
reason of
FieldType (Required 1 (Enumeration NotHandledReason))
NotHandledReason
N_NotMaster
-> do let Just MasterInfoBuf
details = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Optional 2 (Message MasterInfoBuf)
notHandledAdditionalInfo NotHandledBuf
msg
info :: MasterInfo
info = MasterInfoBuf -> MasterInfo
masterInfo MasterInfoBuf
details
node :: NodeEndPoints
node = MasterInfo -> NodeEndPoints
masterInfoNodeEndPoints MasterInfo
info
$(logWarn) [i|Received a non master error on command #{origCmd} [#{pkgId}] on #{node}|]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just NodeEndPoints
node)
FieldType (Required 1 (Enumeration NotHandledReason))
_ -> do $(logWarn) [i|The server has either not started or is too busy. Retrying command #{origCmd} #{pkgId}|]
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
| Bool
otherwise
-> do let respCmd :: Command
respCmd = Package -> Command
packageCmd Package
pkg
forall (m :: * -> *). MonadBase IO m => Mailbox -> Package -> m ()
mailboxSendPkg (Request -> Mailbox
requestMailbox Request
req) Package
pkg
case Request -> Lifetime
requestLifetime Request
req of
Lifetime
OneTime
-> do forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
KeepAlive Command
endCmd
| Command
endCmd forall a. Eq a => a -> a -> Bool
== Command
respCmd
-> do forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
| Bool
otherwise
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
data CRState =
CRState
{ :: !Registry'
, CRState -> [Package]
crsPkgs :: ![Package]
}
crsStateNew :: Registry' -> CRState
Registry'
reg =
CRState
{ crsState :: Registry'
crsState = Registry'
reg
, crsPkgs :: [Package]
crsPkgs = []
}
crsStateDeleteReq :: Request -> CRState -> CRState
Request
req CRState
reg
= let state :: Registry'
state
= CRState -> Registry'
crsState CRState
reg
nextReqs :: Requests
nextReqs
= forall map. IsMap map => ContainerKey map -> map -> map
deleteMap
(Package -> UUID
packageCorrelation forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal forall a b. (a -> b) -> a -> b
$ Request
req)
(Registry' -> Requests
registryRequests Registry'
state)
nextState :: Registry'
nextState
= Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }
crsStateRegisterReq :: Request -> CRState -> CRState
Request
req CRState
reg
= let state :: Registry'
state
= CRState -> Registry'
crsState CRState
reg
nextReqs :: Requests
nextReqs
= forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap
(Package -> UUID
packageCorrelation forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal forall a b. (a -> b) -> a -> b
$ Request
req)
Request
req
(Registry' -> Requests
registryRequests Registry'
state)
nextState :: Registry'
nextState
= Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }
crsStateAddPkg :: Package -> CRState -> CRState
Package
pkg CRState
reg
= let nextPkgs :: [Package]
nextPkgs = Package
pkg forall a. a -> [a] -> [a]
: CRState -> [Package]
crsPkgs CRState
reg in
CRState
reg { crsPkgs :: [Package]
crsPkgs = [Package]
nextPkgs }
registryCheckAndRetry
:: Registry
-> UUID
-> EventStore [Package]
registryCheckAndRetry :: Registry -> UUID -> EventStore [Package]
registryCheckAndRetry Registry
reg UUID
connId
= do Registry'
state <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
let reqs :: [(ContainerKey Requests, MapValue Requests)]
reqs = forall map. IsMap map => map -> [(ContainerKey map, MapValue map)]
mapToList forall a b. (a -> b) -> a -> b
$ Registry' -> Requests
registryRequests Registry'
state
CRState
newState <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed) (Registry' -> CRState
crsStateNew Registry'
state) [(ContainerKey Requests, MapValue Requests)]
reqs
let newStateTemp :: Registry'
newStateTemp = CRState -> Registry'
crsState CRState
newState
awaitings :: [Waiting]
awaitings = Registry' -> [Waiting]
registryWaitings Registry'
newStateTemp
tempState :: Registry'
tempState = Registry'
newStateTemp { registryWaitings :: [Waiting]
registryWaitings = [] }
newCRState :: CRState
newCRState = CRState
newState { crsState :: Registry'
crsState = Registry'
tempState }
finalState :: CRState
finalState = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed) CRState
newCRState [Waiting]
awaitings
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (CRState -> Registry'
crsState CRState
finalState)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CRState -> [Package]
crsPkgs CRState
finalState)
where
checking :: NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed CRState
state (UUID
_, Request
req)
= do let maxTimeout :: NominalDiffTime
maxTimeout = Registry' -> NominalDiffTime
registryTimeout forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState forall a b. (a -> b) -> a -> b
$ CRState
state
hasTimeout :: Bool
hasTimeout = NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- (Request -> NominalDiffTime
requestStarted Request
req) forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
maxTimeout
maxRetry :: Retry
maxRetry = Registry' -> Retry
registryMaxRetry forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState forall a b. (a -> b) -> a -> b
$ CRState
state
if Request -> UUID
requestConnId Request
req forall a. Eq a => a -> a -> Bool
/= UUID
connId
then
do forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
ConnectionHasDropped
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
else if Bool -> Bool
not (Request -> Bool
requestIsKeepAlive Request
req) Bool -> Bool -> Bool
&& Bool
hasTimeout
then case Retry
maxRetry of
AtMost Int
maxAtt
| Request -> Int
requestRetries Request
req forall a. Num a => a -> a -> a
+ Int
1 forall a. Ord a => a -> a -> Bool
> Int
maxAtt
-> do let pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg
$(logError) [i|Command #{cmd} [#{pkgId}] maximum retries threshold reached (#{maxAtt}), aborted!|]
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
| Bool
otherwise
-> EventStore CRState
retryReq
Retry
KeepRetrying
-> EventStore CRState
retryReq
else
forall (f :: * -> *) a. Applicative f => a -> f a
pure CRState
state
where
retryReq :: EventStore CRState
retryReq
= do let nextRetries :: Int
nextRetries
= Request -> Int
requestRetries Request
req forall a. Num a => a -> a -> a
+ Int
1
nextReq :: Request
nextReq
= Request
req
{ requestRetries :: Int
requestRetries = Int
nextRetries
, requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
elapsed
}
maxAtt :: Int
maxAtt
= case Registry' -> Retry
registryMaxRetry forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState forall a b. (a -> b) -> a -> b
$ CRState
state of
AtMost Int
n -> Int
n
Retry
KeepRetrying -> forall a. Bounded a => a
maxBound
pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg
pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
$(logWarn) [i|Command #{cmd} [#{pkgId} has timeout. Retrying (attempt #{nextRetries}/#{maxAtt})|]
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> CRState -> CRState
crsStateRegisterReq Request
nextReq forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg forall a b. (a -> b) -> a -> b
$ CRState
state
sending :: NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed CRState
state Waiting
w
= let req :: Request
req = UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
elapsed Waiting
w
pkg :: Package
pkg = Request -> Package
requestOriginal Request
req in
Request -> CRState -> CRState
crsStateRegisterReq Request
req forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg forall a b. (a -> b) -> a -> b
$ CRState
state
registryAbort :: Registry -> EventStore ()
registryAbort :: Registry -> EventStore ()
registryAbort Registry
reg
= do Registry'
state <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (Registry' -> Registry'
registryClear Registry'
state)
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> Requests
registryRequests Registry'
state) forall a b. (a -> b) -> a -> b
$ \Request
req
-> forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> [Waiting]
registryWaitings Registry'
state) forall a b. (a -> b) -> a -> b
$ \Waiting
w
-> forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Waiting -> Mailbox
waitingMailbox Waiting
w) OperationError
Aborted
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage :: forall a. Decode a => ByteString -> Maybe a
maybeDecodeMessage ByteString
bytes =
case forall a. Get a -> ByteString -> Either String a
runGet forall a. Decode a => Get a
decodeMessage ByteString
bytes of
Right a
a -> forall a. a -> Maybe a
Just a
a
Either String a
_ -> forall a. Maybe a
Nothing