module Database.EventStore.Internal.Manager.Operation.Registry where
import qualified Data.HashMap.Strict as HashMap
import Data.ProtocolBuffers
import Data.Serialize
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types
data Request =
Request
{ requestOriginal :: !Package
, requestConnId :: !UUID
, requestRetries :: !Int
, requestStarted :: !NominalDiffTime
, requestMailbox :: !Mailbox
, requestLifetime :: !Lifetime
}
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive req
= case requestLifetime req of
OneTime -> False
KeepAlive _ -> True
requestToWaiting :: Request -> Waiting
requestToWaiting req =
Waiting
{ waitingLifetime = requestLifetime req
, waitingPkg = requestOriginal req
, waitingMailbox = requestMailbox req
}
waitingToRequest
:: UUID
-> NominalDiffTime
-> Waiting
-> Request
waitingToRequest connId started w
= Request
{ requestOriginal = waitingPkg w
, requestConnId = connId
, requestRetries = 1
, requestStarted = started
, requestMailbox = waitingMailbox w
, requestLifetime = waitingLifetime w
}
data Waiting =
Waiting
{ waitingLifetime :: !Lifetime
, waitingPkg :: !Package
, waitingMailbox :: !Mailbox
}
type Requests = HashMap UUID Request
data Registry' =
Registry'
{ registryRequests :: !Requests
, registryWaitings :: ![Waiting]
, registryTimeout :: !NominalDiffTime
, registryMaxRetry :: !Retry
}
registryClear :: Registry' -> Registry'
registryClear reg =
reg
{ registryRequests = mempty
, registryWaitings = []
}
data Registry =
Registry
{ registryState :: IORef Registry'
, registryStopwatch :: Stopwatch
}
data Blob a b = Blob a b
instance Functor (Blob a) where
fmap f (Blob a b) = Blob a (f b)
registryRemoveRequest
:: UUID
-> Registry'
-> (Maybe Request, Registry')
registryRemoveRequest key reg
= let Blob result newMap
= HashMap.alterF go key (registryRequests reg)
in (result, reg { registryRequests = newMap })
where
go Nothing = Blob Nothing Nothing
go (Just e) = Blob (Just e) Nothing
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew timeout maxRetry
= Registry
<$> newIORef state
<*> newStopwatch
where
state =
Registry'
{ registryRequests = mempty
, registryWaitings = []
, registryTimeout = timeout
, registryMaxRetry = maxRetry
}
registryRegister
:: Registry
-> UUID
-> Lifetime
-> Package
-> Mailbox
-> EventStore ()
registryRegister reg connId lifetime pkg mailbox
= do started <- stopwatchElapsed (registryStopwatch reg)
modifyIORef' (registryState reg) (go started)
where
go started state =
let req = Request
{ requestOriginal = pkg
, requestConnId = connId
, requestRetries = 1
, requestStarted = started
, requestMailbox = mailbox
, requestLifetime = lifetime
}
correlation = packageCorrelation pkg
nextReqs = insertMap correlation req (registryRequests state)
in state { registryRequests = nextReqs }
registryPostpone
:: Registry
-> Mailbox
-> Lifetime
-> Package
-> EventStore ()
registryPostpone reg mailbox lifetime pkg
= modifyIORef' (registryState reg) go
where
go state
= let waiting
= Waiting
{ waitingLifetime = lifetime
, waitingPkg = pkg
, waitingMailbox = mailbox
}
nextWs = waiting : registryWaitings state
in state { registryWaitings = nextWs }
registryHandle
:: Registry
-> Package
-> EventStore (Maybe NodeEndPoints)
registryHandle reg pkg
= do state <- readIORef (registryState reg)
case registryRemoveRequest (packageCorrelation pkg) state of
(Nothing, _)
-> do $logWarn [i|No operation associated to package: #{pkg}|]
pure Nothing
(Just req, stateWithoutReq)
-> case packageCmd pkg of
cmd | cmd == badRequestCmd
-> do let reason = packageDataAsText pkg
mailboxFail (requestMailbox req) (ServerError reason)
pure Nothing
| cmd == notAuthenticatedCmd
-> do mailboxFail (requestMailbox req) NotAuthenticatedOp
pure Nothing
| cmd == notHandledCmd
-> do $(logWarn) [i|Not handled response received: #{pkg}.|]
let Just msg = maybeDecodeMessage $ packageData pkg
reason = getField $ notHandledReason msg
waiting = requestToWaiting req
nextWs = waiting : registryWaitings stateWithoutReq
finalState = stateWithoutReq { registryWaitings = nextWs }
origCmd = packageCmd (requestOriginal req)
pkgId = packageCorrelation pkg
writeIORef (registryState reg) finalState
case reason of
N_NotMaster
-> do let Just details = getField $ notHandledAdditionalInfo msg
info = masterInfo details
node = masterInfoNodeEndPoints info
$(logWarn) [i|Received a non master error on command #{origCmd} [#{pkgId}] on #{node}|]
pure (Just node)
_ -> do $(logWarn) [i|The server has either not started or is too busy. Retrying command #{origCmd} #{pkgId}|]
pure Nothing
| otherwise
-> do let respCmd = packageCmd pkg
mailboxSendPkg (requestMailbox req) pkg
case requestLifetime req of
OneTime
-> do writeIORef (registryState reg) stateWithoutReq
pure Nothing
KeepAlive endCmd
| endCmd == respCmd
-> do writeIORef (registryState reg) stateWithoutReq
pure Nothing
| otherwise
-> pure Nothing
data CRState =
CRState
{ crsState :: !Registry'
, crsPkgs :: ![Package]
}
crsStateNew :: Registry' -> CRState
crsStateNew reg =
CRState
{ crsState = reg
, crsPkgs = []
}
crsStateDeleteReq :: Request -> CRState -> CRState
crsStateDeleteReq req reg
= let state
= crsState reg
nextReqs
= deleteMap
(packageCorrelation . requestOriginal $ req)
(registryRequests state)
nextState
= state { registryRequests = nextReqs } in
reg { crsState = nextState }
crsStateRegisterReq :: Request -> CRState -> CRState
crsStateRegisterReq req reg
= let state
= crsState reg
nextReqs
= insertMap
(packageCorrelation . requestOriginal $ req)
req
(registryRequests state)
nextState
= state { registryRequests = nextReqs } in
reg { crsState = nextState }
crsStateAddPkg :: Package -> CRState -> CRState
crsStateAddPkg pkg reg
= let nextPkgs = pkg : crsPkgs reg in
reg { crsPkgs = nextPkgs }
registryCheckAndRetry
:: Registry
-> UUID
-> EventStore [Package]
registryCheckAndRetry reg connId
= do state <- readIORef (registryState reg)
elapsed <- stopwatchElapsed (registryStopwatch reg)
let reqs = mapToList $ registryRequests state
newState <- foldM (checking elapsed) (crsStateNew state) reqs
let newStateTemp = crsState newState
awaitings = registryWaitings newStateTemp
tempState = newStateTemp { registryWaitings = [] }
newCRState = newState { crsState = tempState }
finalState = foldl' (sending elapsed) newCRState awaitings
writeIORef (registryState reg) (crsState finalState)
pure (crsPkgs finalState)
where
checking elapsed state (_, req)
= do let maxTimeout = registryTimeout . crsState $ state
hasTimeout = elapsed - (requestStarted req) >= maxTimeout
maxRetry = registryMaxRetry . crsState $ state
if requestConnId req /= connId
then
do mailboxFail (requestMailbox req) ConnectionHasDropped
pure (crsStateDeleteReq req state)
else if not (requestIsKeepAlive req) && hasTimeout
then case maxRetry of
AtMost maxAtt
| requestRetries req + 1 > maxAtt
-> do let pkg = requestOriginal req
pkgId = packageCorrelation pkg
cmd = packageCmd pkg
$(logError) [i|Command #{cmd} [#{pkgId}] maximum retries threshold reached (#{maxAtt}), aborted!|]
mailboxFail (requestMailbox req) Aborted
pure (crsStateDeleteReq req state)
| otherwise
-> retryReq
KeepRetrying
-> retryReq
else
pure state
where
retryReq
= do let nextRetries
= requestRetries req + 1
nextReq
= req
{ requestRetries = nextRetries
, requestStarted = elapsed
}
maxAtt
= case registryMaxRetry . crsState $ state of
AtMost n -> n
KeepRetrying -> maxBound
pkg = requestOriginal req
cmd = packageCmd pkg
pkgId = packageCorrelation pkg
$(logWarn) [i|Command #{cmd} [#{pkgId} has timeout. Retrying (attempt #{nextRetries}/#{maxAtt})|]
pure . crsStateRegisterReq nextReq . crsStateAddPkg pkg $ state
sending elapsed state w
= let req = waitingToRequest connId elapsed w
pkg = requestOriginal req in
crsStateRegisterReq req . crsStateAddPkg pkg $ state
registryAbort :: Registry -> EventStore ()
registryAbort reg
= do state <- readIORef (registryState reg)
writeIORef (registryState reg) (registryClear state)
for_ (registryRequests state) $ \req
-> mailboxFail (requestMailbox req) Aborted
for_ (registryWaitings state) $ \w
-> mailboxFail (waitingMailbox w) Aborted
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage bytes =
case runGet decodeMessage bytes of
Right a -> Just a
_ -> Nothing