{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ViewPatterns #-}
module Database.EventStore.Internal.ConnectionManager
( connectionManager ) where
import Data.Typeable
import Control.Monad.Reader
import Data.Time
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Authenticate (newAuthenticatePkg)
import Database.EventStore.Internal.Operation.Identify (newIdentifyPkg)
import qualified Database.EventStore.Internal.Manager.Operation.Registry as Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types
data Stage
= Init
| Connecting Attempts ConnectingState
| Connected Connection
| Closed
instance Show Stage where
show Init = "Init"
show (Connecting a s) = "Connecting: " <> show (a, s)
show (Connected c) = "Connected on" <> show c
show Closed = "Closed"
data ConnectingState
= Reconnecting
| EndpointDiscovery
| ConnectionEstablishing Connection
| Authentication UUID NominalDiffTime Connection
| Identification UUID NominalDiffTime Connection
deriving Show
atLeastEstablishingState :: Stage -> Bool
atLeastEstablishingState = \case
Init -> False
Connected{} -> True
Closed -> True
Connecting _ s ->
case s of
Reconnecting -> False
EndpointDiscovery -> False
_ -> True
data Attempts =
Attempts { attemptCount :: !Int
, attemptLastStart :: !NominalDiffTime
} deriving Show
freshAttempt :: Stopwatch -> EventStore Attempts
freshAttempt = fmap (Attempts 1) . stopwatchElapsed
data ConnectionMaxAttemptReached = ConnectionMaxAttemptReached
deriving Typeable
instance Show ConnectionMaxAttemptReached where
show _ = "Reconnection limit reached."
instance Exception ConnectionMaxAttemptReached
data IdentificationTimeout = IdentificationTimeout deriving Typeable
instance Show IdentificationTimeout where
show _ = "Timed out waiting for client to be identified."
instance Exception IdentificationTimeout
data EstablishConnection = EstablishConnection EndPoint deriving Typeable
newtype CloseConnection = CloseConnection SomeException
deriving (Show, Typeable)
instance Exception CloseConnection
data Tick = Tick deriving Typeable
timerPeriod :: Duration
timerPeriod = msDuration 200
data HeartbeatStage = Interval | Timeout
data HeartbeatTracker =
HeartbeatTracker { _pkgNum :: !Integer
, _heartbeatStage :: !HeartbeatStage
, _startedSince :: !NominalDiffTime
}
newHeartbeatTracker :: MonadBaseControl IO m
=> Stopwatch
-> m (IORef HeartbeatTracker)
newHeartbeatTracker =
newIORef . HeartbeatTracker 0 Interval <=< stopwatchElapsed
initHeartbeatTracker :: Internal -> EventStore ()
initHeartbeatTracker Internal{..} = do
elapsed <- stopwatchElapsed _stopwatch
pkgNum <- readIORef _lastPkgNum
let tracker = HeartbeatTracker pkgNum Interval elapsed
atomicWriteIORef _tracker tracker
data Internal =
Internal { _disc :: Discovery
, _builder :: ConnectionBuilder
, _stage :: IORef Stage
, _last :: IORef (Maybe EndPoint)
, _sending :: TVar Bool
, _opMgr :: Operation.Registry
, _stopwatch :: Stopwatch
, _lastCheck :: IORef NominalDiffTime
, _lastConnected :: IORef Bool
, _tracker :: IORef HeartbeatTracker
, _lastPkgNum :: IORef Integer
}
incrPackageNumber :: Internal -> EventStore ()
incrPackageNumber Internal{..} = do
atomicModifyIORef' _lastPkgNum $ \n -> (n + 1, ())
monitorIncrPkgCount
connectionManager :: Settings
-> ConnectionBuilder
-> Discovery
-> Hub
-> IO ()
connectionManager setts builder disc mainBus = do
stageRef <- newIORef Init
let mkInternal = Internal disc builder stageRef
stopwatch <- newStopwatch
timeoutCheck <- stopwatchElapsed stopwatch
internal <- mkInternal <$> newIORef Nothing
<*> newTVarIO False
<*> Operation.registryNew (s_operationTimeout setts) (s_operationRetry setts)
<*> return stopwatch
<*> newIORef timeoutCheck
<*> newIORef False
<*> newHeartbeatTracker stopwatch
<*> newIORef 0
subscribe mainBus (onInit internal)
subscribe mainBus (onEstablish internal)
subscribe mainBus (onEstablished internal)
subscribe mainBus (onArrived internal)
subscribe mainBus (onTransmit internal)
subscribe mainBus (onConnectionError internal)
subscribe mainBus (onConnectionClosed internal)
subscribe mainBus (onCloseConnection internal)
subscribe mainBus (onShutdown internal)
subscribe mainBus (onTick internal)
subscribe mainBus (onSendPackage internal)
onInit :: Internal -> SystemInit -> EventStore ()
onInit self@Internal{..} _ = do
publish (NewTimer Tick timerPeriod False)
startConnect self
startConnect :: Internal -> EventStore ()
startConnect self@Internal{..} =
readIORef _stage >>= \case
Init -> do
atts <- freshAttempt _stopwatch
atomicWriteIORef _stage (Connecting atts Reconnecting)
discover self
_ -> return ()
discover :: Internal -> EventStore ()
discover Internal{..} =
readIORef _stage >>= \case
Connecting att p ->
case p of
Reconnecting{} -> do
atomicWriteIORef _stage (Connecting att EndpointDiscovery)
old <- readIORef _last
_ <- fork $
tryAny (runDiscovery _disc old) >>= \case
Left e -> do
$logError
[i| Failed to resolve TCP endpoint to which to connect #{e}.|]
publish (CloseConnection e)
Right opt ->
case opt of
Nothing -> do
$logWarn
"Failed to resolve TCP endpoint to which to connect."
Just ept -> publish (EstablishConnection ept)
return ()
_ -> return ()
_ -> return ()
establish :: Internal -> EndPoint -> EventStore ()
establish Internal{..} ept = do
$(logDebug) [i|Establish tcp connection on [#{ept}]|]
readIORef _stage >>= \case
Connecting att s ->
case s of
EndpointDiscovery -> do
conn <- connect _builder ept
connected <- atomicModifyIORef' _lastConnected $ \c -> (True, c)
unless connected $
publish (Initialized ConnectionManager)
atomicWriteIORef _stage (Connecting att (ConnectionEstablishing conn))
_ -> return ()
_ -> return ()
established :: Internal -> Connection -> EventStore ()
established self@Internal{..} conn =
readIORef _stage >>= \case
Connecting att (ConnectionEstablishing known) -> do
when (conn == known) $ do
$logDebug [i|TCP connection established: #{conn}.|]
setts <- getSettings
case s_defaultUserCredentials setts of
Just cred -> authenticate self att conn cred
Nothing -> identifyClient self att known
_ -> return ()
authenticate :: Internal
-> Attempts
-> Connection
-> Credentials
-> EventStore ()
authenticate Internal{..} att conn cred = do
pkg <- newAuthenticatePkg cred
elapsed <- stopwatchElapsed _stopwatch
let authCorr = packageCorrelation pkg
atomicWriteIORef _stage (Connecting att (Authentication authCorr elapsed conn))
enqueuePackage conn pkg
identifyClient :: Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal{..} att conn = do
setts <- getSettings
uuid <- newUUID
let defName = [i|ES-#{uuid}|]
connName = fromMaybe defName (s_defaultConnectionName setts)
pkg <- newIdentifyPkg clientVersion connName
elapsed <- stopwatchElapsed _stopwatch
let idCorr = packageCorrelation pkg
atomicWriteIORef _stage (Connecting att (Identification idCorr elapsed conn))
enqueuePackage conn pkg
where
clientVersion = 1
clientIdentified :: Internal -> EventStore ()
clientIdentified self@Internal{..} =
readIORef _stage >>= \case
Connecting _ (Identification _ _ conn) -> do
$logDebug [i|TCP connection identified: #{conn}.|]
atomicWriteIORef _stage (Connected conn)
initHeartbeatTracker self
pkgs <- Operation.registryCheckAndRetry _opMgr (connectionId conn)
traverse_ (enqueuePackage conn) pkgs
_ -> pure ()
onEstablished :: Internal -> ConnectionEstablished -> EventStore ()
onEstablished self (ConnectionEstablished conn) = established self conn
closeConnection :: Exception e => Internal -> e -> EventStore ()
closeConnection self@Internal{..} cause = do
$logDebug [i|CloseConnection: #{cause}.|]
mConn <- lookupConnectionAndSwitchToClosed self
Operation.registryAbort _opMgr
traverse_ (closeTcpConnection self cause) mConn
$logInfo [i|CloseConnection: connection cleanup done for [#{cause}].|]
publish (FatalException cause)
lookupConnectionAndSwitchToClosed :: Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed self@Internal{..} = do
outcome <- lookupConnection self
atomicWriteIORef _stage Closed
return outcome
closeTcpConnection :: Exception e => Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal{..} cause conn = do
let cid = connectionId conn
$logDebug [i|CloseTcpConnection: connection [#{cid}]. Cause: #{cause}.|]
dispose conn
$logDebug [i|CloseTcpConnection: connection [#{cid}] disposed.|]
readIORef _stage >>= \case
Closed -> return ()
stage -> do
att <-
case stage of
Connecting old _ -> return old
_ -> freshAttempt _stopwatch
atomicWriteIORef _stage (Connecting att Reconnecting)
data ForceReconnect = ForceReconnect EndPoint deriving (Typeable, Show)
instance Exception ForceReconnect
forceReconnect :: Internal -> NodeEndPoints -> EventStore ()
forceReconnect self@Internal{..} node = do
setts <- getSettings
let ept = if isJust $ s_ssl setts
then let Just pt = secureEndPoint node in pt
else tcpEndPoint node
Connected conn <- readIORef _stage
when (connectionEndPoint conn /= ept) $ do
monitorIncrForceReconnect
closeTcpConnection self (ForceReconnect ept) conn
att <- freshAttempt _stopwatch
atomicModifyIORef' _lastPkgNum $ \cur -> (cur + 1, ())
atomicWriteIORef _stage (Connecting att EndpointDiscovery)
$logInfo [i|#{conn}: going to reconnect to #{ept}.|]
establish self ept
onEstablish :: Internal -> EstablishConnection -> EventStore ()
onEstablish self (EstablishConnection ept) = establish self ept
onTick :: Internal -> Tick -> EventStore ()
onTick self@Internal{..} _ = do
setts <- getSettings
readIORef _stage >>= \case
(onGoingConnection -> Just Attempts{..}) -> do
elapsed <- stopwatchElapsed _stopwatch
when (elapsed - attemptLastStart >= s_reconnect_delay setts) $ do
let retries = attemptCount + 1
att = Attempts retries elapsed
atomicWriteIORef _stage (Connecting att Reconnecting)
case s_retry setts of
AtMost n
| attemptCount <= n -> retryConnection attemptCount
| otherwise -> maxAttemptReached
KeepRetrying -> retryConnection attemptCount
(pendingAuthenticate -> Just (started, att, conn)) -> do
elapsed <- stopwatchElapsed _stopwatch
when (elapsed - started >= s_operationTimeout setts) $ do
$logWarn "Authentication timed out."
identifyClient self att conn
(pendingIdentification -> Just started) -> do
elapsed <- stopwatchElapsed _stopwatch
when (elapsed - started >= s_operationTimeout setts) $
traverse_ (closeTcpConnection self IdentificationTimeout)
=<< lookupConnection self
(defaultConnecting -> True) -> manageHeartbeats self
Connected conn -> do
elapsed <- stopwatchElapsed _stopwatch
timeoutCheckStart <- readIORef _lastCheck
when (elapsed - timeoutCheckStart >= s_operationTimeout setts) $ do
$logDebug "Start check and retry..."
pkgs <- Operation.registryCheckAndRetry _opMgr (connectionId conn)
traverse_ (enqueuePackage conn) pkgs
$logDebug "Completed check and retry"
atomicWriteIORef _lastCheck elapsed
_ -> return ()
stage <- readIORef _stage
when (atLeastEstablishingState stage) $
manageHeartbeats self
where
onGoingConnection (Connecting att Reconnecting) = Just att
onGoingConnection (Connecting att ConnectionEstablishing{}) = Just att
onGoingConnection _ = Nothing
pendingIdentification (Connecting _ (Identification _ started _)) = Just started
pendingIdentification _ = Nothing
pendingAuthenticate (Connecting a (Authentication _ started c)) = Just (started, a, c)
pendingAuthenticate _ = Nothing
defaultConnecting Connecting{} = True
defaultConnecting _ = False
maxAttemptReached = do
closeConnection self ConnectionMaxAttemptReached
publish (FatalException ConnectionMaxAttemptReached)
retryConnection cnt = do
$logDebug [i|Checking reconnection... (attempt #{cnt}).|]
discover self
data ServerHeartbeatTimeout = ServerHeartbeatTimeout deriving Typeable
instance Show ServerHeartbeatTimeout where
show _ = "Server connection has heartbeat timeout"
instance Exception ServerHeartbeatTimeout
manageHeartbeats :: Internal -> EventStore ()
manageHeartbeats self@Internal{..} = traverse_ go =<< lookupConnection self
where
go conn = do
elapsed <- stopwatchElapsed _stopwatch
pkgNum <- readIORef _lastPkgNum
tracker <- readIORef _tracker
setts <- getSettings
let interval = s_heartbeatInterval setts
timeout = s_heartbeatTimeout setts
initTracker = tracker
{ _heartbeatStage = Interval
, _startedSince = elapsed
, _pkgNum = pkgNum
}
if _pkgNum tracker /= pkgNum
then atomicWriteIORef _tracker initTracker
else
case _heartbeatStage tracker of
Interval
| elapsed - _startedSince tracker >= interval -> do
uuid <- freshUUID
let pkg = heartbeatRequestPackage uuid
newTracker = tracker
{ _heartbeatStage = Timeout
, _startedSince = elapsed
, _pkgNum = pkgNum
}
enqueuePackage conn pkg
atomicWriteIORef _tracker newTracker
| otherwise -> return ()
Timeout
| elapsed - _startedSince tracker >= timeout -> do
monitorIncrHeartbeatTimeouts
$logInfo [i|Closing #{conn} due to HEARTBEAT TIMEOUT at pkgNum #{pkgNum}|]
closeTcpConnection self ServerHeartbeatTimeout conn
| otherwise -> return ()
onArrived :: Internal -> PackageArrived -> EventStore ()
onArrived self@Internal{..} (PackageArrived conn pkg@Package{..}) = do
cur <- readIORef _stage
unless (closedOrInit cur) $
incrPackageNumber self
readIORef _stage >>= \case
(onAuthentication -> Just att) -> do
when (packageCmd == notAuthenticatedCmd) $
$logWarn "Not authenticated."
identifyClient self att conn
(onIdentification -> True) ->
clientIdentified self
(runningConnection -> True) -> do
$logDebug [i|Package received: #{pkg}.|]
handlePackage
_ -> $logDebug [i|Package IGNORED: #{pkg}.|]
where
onIdentification (Connecting _ (Identification u _ _)) =
packageCorrelation == u && packageCmd == clientIdentifiedCmd
onIdentification _ = False
onAuthentication (Connecting a (Authentication u _ _)) =
if packageCorrelation == u && (packageCmd == authenticatedCmd || packageCmd == notAuthenticatedCmd)
then Just a
else Nothing
onAuthentication _ = Nothing
runningConnection (Connecting _ (ConnectionEstablishing c)) = conn == c
runningConnection (Connecting _ (Authentication _ _ c)) = conn == c
runningConnection (Connecting _ (Identification _ _ c)) = conn == c
runningConnection (Connected c) = conn == c
runningConnection _ = False
heartbeatResponse = heartbeatResponsePackage packageCorrelation
handlePackage
| packageCmd == heartbeatResponseCmd = return ()
| packageCmd == heartbeatRequestCmd =
enqueuePackage conn heartbeatResponse
| otherwise =
Operation.registryHandle _opMgr pkg >>= \case
Nothing -> pure ()
Just node -> forceReconnect self node
closedOrInit = \case
Init -> True
Closed -> True
_ -> False
isSameConnection :: Internal -> Connection -> EventStore Bool
isSameConnection Internal{..} conn = go <$> readIORef _stage
where
go (Connected known) = known == conn
go (Connecting _ state) =
case state of
ConnectionEstablishing known -> known == conn
Authentication _ _ known -> known == conn
Identification _ _ known -> known == conn
_ -> False
go _ = False
onConnectionError :: Internal -> ConnectionError -> EventStore ()
onConnectionError self@Internal{..} (ConnectionError conn e) =
whenM (isSameConnection self conn) $ do
$logError [i|TCP #{conn} error. Cause: #{e}.|]
closeConnection self e
onConnectionClosed :: Internal -> ConnectionClosed -> EventStore ()
onConnectionClosed self@Internal{..} (ConnectionClosed conn cause) =
whenM (isSameConnection self conn) $ do
closeTcpConnection self cause conn
monitorIncrConnectionDrop
onShutdown :: Internal -> SystemShutdown -> EventStore ()
onShutdown self@Internal{..} _ = do
$logDebug "Shutting down..."
mConn <- lookupConnectionAndSwitchToClosed self
Operation.registryAbort _opMgr
traverse_ dispose mConn
$logDebug "Shutdown properly."
publish (ServiceTerminated ConnectionManager)
onTransmit :: Internal -> Transmit -> EventStore ()
onTransmit Internal{..} (Transmit m lifetime pkg) =
readIORef _stage >>= \case
Closed
-> mailboxFail m Aborted
Connected conn
-> do enqueuePackage conn pkg
Operation.registryRegister _opMgr (connectionId conn) lifetime pkg m
_ -> Operation.registryPostpone _opMgr m lifetime pkg
onCloseConnection :: Internal -> CloseConnection -> EventStore ()
onCloseConnection self e = closeConnection self e
lookupConnection :: Internal -> EventStore (Maybe Connection)
lookupConnection Internal{..} = lookingUpConnection _stage
lookingUpConnection :: IORef Stage -> EventStore (Maybe Connection)
lookingUpConnection ref = go <$> readIORef ref
where
go (Connected conn) = Just conn
go (Connecting _ state) =
case state of
ConnectionEstablishing conn -> Just conn
Authentication _ _ conn -> Just conn
Identification _ _ conn -> Just conn
_ -> Nothing
go _ = Nothing
onSendPackage :: Internal -> SendPackage -> EventStore ()
onSendPackage self (SendPackage pkg) =
traverse_ sending =<< lookupConnection self
where
sending conn = enqueuePackage conn pkg