{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ViewPatterns #-}
module Database.EventStore.Internal.ConnectionManager
( connectionManager ) where
import Data.Typeable
import Control.Monad.Reader
import Data.Time
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Authenticate (newAuthenticatePkg)
import Database.EventStore.Internal.Operation.Identify (newIdentifyPkg)
import qualified Database.EventStore.Internal.Manager.Operation.Registry as Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types
data Stage
= Init
| Connecting Attempts ConnectingState
| Connected Connection
| Closed
instance Show Stage where
show :: Stage -> String
show Stage
Init = String
"Init"
show (Connecting Attempts
a ConnectingState
s) = String
"Connecting: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (Attempts
a, ConnectingState
s)
show (Connected Connection
c) = String
"Connected on" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Connection
c
show Stage
Closed = String
"Closed"
data ConnectingState
= Reconnecting
| EndpointDiscovery
| ConnectionEstablishing Connection
| Authentication UUID NominalDiffTime Connection
| Identification UUID NominalDiffTime Connection
deriving Int -> ConnectingState -> ShowS
[ConnectingState] -> ShowS
ConnectingState -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectingState] -> ShowS
$cshowList :: [ConnectingState] -> ShowS
show :: ConnectingState -> String
$cshow :: ConnectingState -> String
showsPrec :: Int -> ConnectingState -> ShowS
$cshowsPrec :: Int -> ConnectingState -> ShowS
Show
atLeastEstablishingState :: Stage -> Bool
atLeastEstablishingState :: Stage -> Bool
atLeastEstablishingState = \case
Stage
Init -> Bool
False
Connected{} -> Bool
True
Stage
Closed -> Bool
True
Connecting Attempts
_ ConnectingState
s ->
case ConnectingState
s of
ConnectingState
Reconnecting -> Bool
False
ConnectingState
EndpointDiscovery -> Bool
False
ConnectingState
_ -> Bool
True
data Attempts =
Attempts { Attempts -> Int
attemptCount :: !Int
, Attempts -> NominalDiffTime
attemptLastStart :: !NominalDiffTime
} deriving Int -> Attempts -> ShowS
[Attempts] -> ShowS
Attempts -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Attempts] -> ShowS
$cshowList :: [Attempts] -> ShowS
show :: Attempts -> String
$cshow :: Attempts -> String
showsPrec :: Int -> Attempts -> ShowS
$cshowsPrec :: Int -> Attempts -> ShowS
Show
freshAttempt :: Stopwatch -> EventStore Attempts
freshAttempt :: Stopwatch -> EventStore Attempts
freshAttempt = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> NominalDiffTime -> Attempts
Attempts Int
1) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed
data ConnectionMaxAttemptReached = ConnectionMaxAttemptReached
deriving Typeable
instance Show ConnectionMaxAttemptReached where
show :: ConnectionMaxAttemptReached -> String
show ConnectionMaxAttemptReached
_ = String
"Reconnection limit reached."
instance Exception ConnectionMaxAttemptReached
data IdentificationTimeout = IdentificationTimeout deriving Typeable
instance Show IdentificationTimeout where
show :: IdentificationTimeout -> String
show IdentificationTimeout
_ = String
"Timed out waiting for client to be identified."
instance Exception IdentificationTimeout
data EstablishConnection = EstablishConnection EndPoint deriving Typeable
newtype CloseConnection = CloseConnection SomeException
deriving (Int -> CloseConnection -> ShowS
[CloseConnection] -> ShowS
CloseConnection -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CloseConnection] -> ShowS
$cshowList :: [CloseConnection] -> ShowS
show :: CloseConnection -> String
$cshow :: CloseConnection -> String
showsPrec :: Int -> CloseConnection -> ShowS
$cshowsPrec :: Int -> CloseConnection -> ShowS
Show, Typeable)
instance Exception CloseConnection
data Tick = Tick deriving Typeable
timerPeriod :: Duration
timerPeriod :: Duration
timerPeriod = Int64 -> Duration
msDuration Int64
200
data HeartbeatStage = Interval | Timeout
data HeartbeatTracker =
HeartbeatTracker { HeartbeatTracker -> Integer
_pkgNum :: !Integer
, HeartbeatTracker -> HeartbeatStage
_heartbeatStage :: !HeartbeatStage
, HeartbeatTracker -> NominalDiffTime
_startedSince :: !NominalDiffTime
}
newHeartbeatTracker :: MonadBaseControl IO m
=> Stopwatch
-> m (IORef HeartbeatTracker)
newHeartbeatTracker :: forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m (IORef HeartbeatTracker)
newHeartbeatTracker =
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef forall b c a. (b -> c) -> (a -> b) -> a -> c
. Integer -> HeartbeatStage -> NominalDiffTime -> HeartbeatTracker
HeartbeatTracker Integer
0 HeartbeatStage
Interval forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed
initHeartbeatTracker :: Internal -> EventStore ()
initHeartbeatTracker :: Internal -> EventStore ()
initHeartbeatTracker Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
..} = do
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
Integer
pkgNum <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Integer
_lastPkgNum
let tracker :: HeartbeatTracker
tracker = Integer -> HeartbeatStage -> NominalDiffTime -> HeartbeatTracker
HeartbeatTracker Integer
pkgNum HeartbeatStage
Interval NominalDiffTime
elapsed
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef HeartbeatTracker
_tracker HeartbeatTracker
tracker
data Internal =
Internal { Internal -> Discovery
_disc :: Discovery
, Internal -> ConnectionBuilder
_builder :: ConnectionBuilder
, Internal -> IORef Stage
_stage :: IORef Stage
, Internal -> IORef (Maybe EndPoint)
_last :: IORef (Maybe EndPoint)
, Internal -> TVar Bool
_sending :: TVar Bool
, Internal -> Registry
_opMgr :: Operation.Registry
, Internal -> Stopwatch
_stopwatch :: Stopwatch
, Internal -> IORef NominalDiffTime
_lastCheck :: IORef NominalDiffTime
, Internal -> IORef Bool
_lastConnected :: IORef Bool
, Internal -> IORef HeartbeatTracker
_tracker :: IORef HeartbeatTracker
, Internal -> IORef Integer
_lastPkgNum :: IORef Integer
}
incrPackageNumber :: Internal -> EventStore ()
incrPackageNumber :: Internal -> EventStore ()
incrPackageNumber Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = do
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef Integer
_lastPkgNum forall a b. (a -> b) -> a -> b
$ \Integer
n -> (Integer
n forall a. Num a => a -> a -> a
+ Integer
1, ())
EventStore ()
monitorIncrPkgCount
connectionManager :: Settings
-> ConnectionBuilder
-> Discovery
-> Hub
-> IO ()
connectionManager :: Settings -> ConnectionBuilder -> Discovery -> Hub -> IO ()
connectionManager Settings
setts ConnectionBuilder
builder Discovery
disc Hub
mainBus = do
IORef Stage
stageRef <- forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Stage
Init
let mkInternal :: IORef (Maybe EndPoint)
-> TVar Bool
-> Registry
-> Stopwatch
-> IORef NominalDiffTime
-> IORef Bool
-> IORef HeartbeatTracker
-> IORef Integer
-> Internal
mkInternal = Discovery
-> ConnectionBuilder
-> IORef Stage
-> IORef (Maybe EndPoint)
-> TVar Bool
-> Registry
-> Stopwatch
-> IORef NominalDiffTime
-> IORef Bool
-> IORef HeartbeatTracker
-> IORef Integer
-> Internal
Internal Discovery
disc ConnectionBuilder
builder IORef Stage
stageRef
Stopwatch
stopwatch <- forall (m :: * -> *). MonadBase IO m => m Stopwatch
newStopwatch
NominalDiffTime
timeoutCheck <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
stopwatch
Internal
internal <- IORef (Maybe EndPoint)
-> TVar Bool
-> Registry
-> Stopwatch
-> IORef NominalDiffTime
-> IORef Bool
-> IORef HeartbeatTracker
-> IORef Integer
-> Internal
mkInternal forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef forall a. Maybe a
Nothing
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. a -> IO (TVar a)
newTVarIO Bool
False
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NominalDiffTime -> Retry -> IO Registry
Operation.registryNew (Settings -> NominalDiffTime
s_operationTimeout Settings
setts) (Settings -> Retry
s_operationRetry Settings
setts)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. Monad m => a -> m a
return Stopwatch
stopwatch
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef NominalDiffTime
timeoutCheck
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Bool
False
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m (IORef HeartbeatTracker)
newHeartbeatTracker Stopwatch
stopwatch
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Integer
0
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> SystemInit -> EventStore ()
onInit Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> EstablishConnection -> EventStore ()
onEstablish Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> ConnectionEstablished -> EventStore ()
onEstablished Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> PackageArrived -> EventStore ()
onArrived Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> Transmit -> EventStore ()
onTransmit Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> ConnectionError -> EventStore ()
onConnectionError Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> ConnectionClosed -> EventStore ()
onConnectionClosed Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> CloseConnection -> EventStore ()
onCloseConnection Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> SystemShutdown -> EventStore ()
onShutdown Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> Tick -> EventStore ()
onTick Internal
internal)
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> SendPackage -> EventStore ()
onSendPackage Internal
internal)
onInit :: Internal -> SystemInit -> EventStore ()
onInit :: Internal -> SystemInit -> EventStore ()
onInit self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} SystemInit
_ = do
forall a. Typeable a => a -> EventStore ()
publish (forall e. Typeable e => e -> Duration -> Bool -> NewTimer
NewTimer Tick
Tick Duration
timerPeriod Bool
False)
Internal -> EventStore ()
startConnect Internal
self
startConnect :: Internal -> EventStore ()
startConnect :: Internal -> EventStore ()
startConnect self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} =
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Stage
Init -> do
Attempts
atts <- Stopwatch -> EventStore Attempts
freshAttempt Stopwatch
_stopwatch
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
atts ConnectingState
Reconnecting)
Internal -> EventStore ()
discover Internal
self
Stage
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
discover :: Internal -> EventStore ()
discover :: Internal -> EventStore ()
discover Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} =
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Connecting Attempts
att ConnectingState
p ->
case ConnectingState
p of
Reconnecting{} -> do
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
EndpointDiscovery)
Maybe EndPoint
old <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef (Maybe EndPoint)
_last
ThreadId
_ <- forall (m :: * -> *). MonadBaseControl IO m => m () -> m ThreadId
fork forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (Discovery -> Maybe EndPoint -> EventStore (Maybe EndPoint)
runDiscovery Discovery
_disc Maybe EndPoint
old) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logError
[i| Failed to resolve TCP endpoint to which to connect #{e}.|]
forall a. Typeable a => a -> EventStore ()
publish (SomeException -> CloseConnection
CloseConnection SomeException
e)
Right Maybe EndPoint
opt ->
case Maybe EndPoint
opt of
Maybe EndPoint
Nothing -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn
Text
"Failed to resolve TCP endpoint to which to connect."
Just EndPoint
ept -> forall a. Typeable a => a -> EventStore ()
publish (EndPoint -> EstablishConnection
EstablishConnection EndPoint
ept)
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ConnectingState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Stage
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
establish :: Internal -> EndPoint -> EventStore ()
establish :: Internal -> EndPoint -> EventStore ()
establish Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} EndPoint
ept = do
$(logDebug) [i|Establish tcp connection on [#{ept}]|]
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Connecting Attempts
att ConnectingState
s ->
case ConnectingState
s of
ConnectingState
EndpointDiscovery -> do
Connection
conn <- ConnectionBuilder -> EndPoint -> EventStore Connection
connect ConnectionBuilder
_builder EndPoint
ept
Bool
connected <- forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef Bool
_lastConnected forall a b. (a -> b) -> a -> b
$ \Bool
c -> (Bool
True, Bool
c)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
connected forall a b. (a -> b) -> a -> b
$
forall a. Typeable a => a -> EventStore ()
publish (Service -> Initialized
Initialized Service
ConnectionManager)
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att (Connection -> ConnectingState
ConnectionEstablishing Connection
conn))
ConnectingState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Stage
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
established :: Internal -> Connection -> EventStore ()
established :: Internal -> Connection -> EventStore ()
established self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Connection
conn =
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Connecting Attempts
att (ConnectionEstablishing Connection
known) -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Connection
conn forall a. Eq a => a -> a -> Bool
== Connection
known) forall a b. (a -> b) -> a -> b
$ do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|TCP connection established: #{conn}.|]
Settings
setts <- EventStore Settings
getSettings
case Settings -> Maybe Credentials
s_defaultUserCredentials Settings
setts of
Just Credentials
cred -> Internal -> Attempts -> Connection -> Credentials -> EventStore ()
authenticate Internal
self Attempts
att Connection
conn Credentials
cred
Maybe Credentials
Nothing -> Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal
self Attempts
att Connection
known
Stage
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
authenticate :: Internal
-> Attempts
-> Connection
-> Credentials
-> EventStore ()
authenticate :: Internal -> Attempts -> Connection -> Credentials -> EventStore ()
authenticate Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Attempts
att Connection
conn Credentials
cred = do
Package
pkg <- forall (m :: * -> *). MonadBase IO m => Credentials -> m Package
newAuthenticatePkg Credentials
cred
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
let authCorr :: UUID
authCorr = Package -> UUID
packageCorrelation Package
pkg
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att (UUID -> NominalDiffTime -> Connection -> ConnectingState
Authentication UUID
authCorr NominalDiffTime
elapsed Connection
conn))
Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg
identifyClient :: Internal -> Attempts -> Connection -> EventStore ()
identifyClient :: Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Attempts
att Connection
conn = do
Settings
setts <- EventStore Settings
getSettings
UUID
uuid <- forall (m :: * -> *). MonadBase IO m => m UUID
newUUID
let defName :: Text
defName = [i|ES-#{uuid}|]
connName :: Text
connName = forall a. a -> Maybe a -> a
fromMaybe Text
defName (Settings -> Maybe Text
s_defaultConnectionName Settings
setts)
Package
pkg <- forall (m :: * -> *). MonadBase IO m => Int32 -> Text -> m Package
newIdentifyPkg Int32
clientVersion Text
connName
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
let idCorr :: UUID
idCorr = Package -> UUID
packageCorrelation Package
pkg
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att (UUID -> NominalDiffTime -> Connection -> ConnectingState
Identification UUID
idCorr NominalDiffTime
elapsed Connection
conn))
Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg
where
clientVersion :: Int32
clientVersion = Int32
1
clientIdentified :: Internal -> EventStore ()
clientIdentified :: Internal -> EventStore ()
clientIdentified self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} =
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Connecting Attempts
_ (Identification UUID
_ NominalDiffTime
_ Connection
conn) -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|TCP connection identified: #{conn}.|]
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Connection -> Stage
Connected Connection
conn)
Internal -> EventStore ()
initHeartbeatTracker Internal
self
[Package]
pkgs <- Registry -> UUID -> EventStore [Package]
Operation.registryCheckAndRetry Registry
_opMgr (Connection -> UUID
connectionId Connection
conn)
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Connection -> Package -> EventStore ()
enqueuePackage Connection
conn) [Package]
pkgs
Stage
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
onEstablished :: Internal -> ConnectionEstablished -> EventStore ()
onEstablished :: Internal -> ConnectionEstablished -> EventStore ()
onEstablished Internal
self (ConnectionEstablished Connection
conn) = Internal -> Connection -> EventStore ()
established Internal
self Connection
conn
closeConnection :: Exception e => Internal -> e -> EventStore ()
closeConnection :: forall e. Exception e => Internal -> e -> EventStore ()
closeConnection self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} e
cause = do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|CloseConnection: #{cause}.|]
Maybe Connection
mConn <- Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed Internal
self
Registry -> EventStore ()
Operation.registryAbort Registry
_opMgr
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self e
cause) Maybe Connection
mConn
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logInfo [i|CloseConnection: connection cleanup done for [#{cause}].|]
forall a. Typeable a => a -> EventStore ()
publish (forall e. Exception e => e -> FatalException
FatalException e
cause)
lookupConnectionAndSwitchToClosed :: Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed :: Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = do
Maybe Connection
outcome <- Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage Stage
Closed
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Connection
outcome
closeTcpConnection :: Exception e => Internal -> e -> Connection -> EventStore ()
closeTcpConnection :: forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} e
cause Connection
conn = do
let cid :: UUID
cid = Connection -> UUID
connectionId Connection
conn
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|CloseTcpConnection: connection [#{cid}]. Cause: #{cause}.|]
Connection -> EventStore ()
dispose Connection
conn
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|CloseTcpConnection: connection [#{cid}] disposed.|]
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Stage
Closed -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Stage
stage -> do
Attempts
att <-
case Stage
stage of
Connecting Attempts
old ConnectingState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Attempts
old
Stage
_ -> Stopwatch -> EventStore Attempts
freshAttempt Stopwatch
_stopwatch
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
Reconnecting)
data ForceReconnect = ForceReconnect EndPoint deriving (Typeable, Int -> ForceReconnect -> ShowS
[ForceReconnect] -> ShowS
ForceReconnect -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ForceReconnect] -> ShowS
$cshowList :: [ForceReconnect] -> ShowS
show :: ForceReconnect -> String
$cshow :: ForceReconnect -> String
showsPrec :: Int -> ForceReconnect -> ShowS
$cshowsPrec :: Int -> ForceReconnect -> ShowS
Show)
instance Exception ForceReconnect
forceReconnect :: Internal -> NodeEndPoints -> EventStore ()
forceReconnect :: Internal -> NodeEndPoints -> EventStore ()
forceReconnect self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} NodeEndPoints
node = do
Settings
setts <- EventStore Settings
getSettings
let ept :: EndPoint
ept = if forall a. Maybe a -> Bool
isJust forall a b. (a -> b) -> a -> b
$ Settings -> Maybe TLSSettings
s_ssl Settings
setts
then let Just EndPoint
pt = NodeEndPoints -> Maybe EndPoint
secureEndPoint NodeEndPoints
node in EndPoint
pt
else NodeEndPoints -> EndPoint
tcpEndPoint NodeEndPoints
node
Connected Connection
conn <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Connection -> EndPoint
connectionEndPoint Connection
conn forall a. Eq a => a -> a -> Bool
/= EndPoint
ept) forall a b. (a -> b) -> a -> b
$ do
EventStore ()
monitorIncrForceReconnect
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self (EndPoint -> ForceReconnect
ForceReconnect EndPoint
ept) Connection
conn
Attempts
att <- Stopwatch -> EventStore Attempts
freshAttempt Stopwatch
_stopwatch
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef Integer
_lastPkgNum forall a b. (a -> b) -> a -> b
$ \Integer
cur -> (Integer
cur forall a. Num a => a -> a -> a
+ Integer
1, ())
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
EndpointDiscovery)
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logInfo [i|#{conn}: going to reconnect to #{ept}.|]
Internal -> EndPoint -> EventStore ()
establish Internal
self EndPoint
ept
onEstablish :: Internal -> EstablishConnection -> EventStore ()
onEstablish :: Internal -> EstablishConnection -> EventStore ()
onEstablish Internal
self (EstablishConnection EndPoint
ept) = Internal -> EndPoint -> EventStore ()
establish Internal
self EndPoint
ept
onTick :: Internal -> Tick -> EventStore ()
onTick :: Internal -> Tick -> EventStore ()
onTick self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Tick
_ = do
Settings
setts <- EventStore Settings
getSettings
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Stage -> Maybe Attempts
onGoingConnection -> Just Attempts{Int
NominalDiffTime
attemptLastStart :: NominalDiffTime
attemptCount :: Int
attemptLastStart :: Attempts -> NominalDiffTime
attemptCount :: Attempts -> Int
..}) -> do
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- NominalDiffTime
attemptLastStart forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_reconnect_delay Settings
setts) forall a b. (a -> b) -> a -> b
$ do
let retries :: Int
retries = Int
attemptCount forall a. Num a => a -> a -> a
+ Int
1
att :: Attempts
att = Int -> NominalDiffTime -> Attempts
Attempts Int
retries NominalDiffTime
elapsed
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
Reconnecting)
case Settings -> Retry
s_retry Settings
setts of
AtMost Int
n
| Int
attemptCount forall a. Ord a => a -> a -> Bool
<= Int
n -> Int -> EventStore ()
retryConnection Int
attemptCount
| Bool
otherwise -> EventStore ()
maxAttemptReached
Retry
KeepRetrying -> Int -> EventStore ()
retryConnection Int
attemptCount
(Stage -> Maybe (NominalDiffTime, Attempts, Connection)
pendingAuthenticate -> Just (NominalDiffTime
started, Attempts
att, Connection
conn)) -> do
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- NominalDiffTime
started forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_operationTimeout Settings
setts) forall a b. (a -> b) -> a -> b
$ do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn Text
"Authentication timed out."
Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal
self Attempts
att Connection
conn
(Stage -> Maybe NominalDiffTime
pendingIdentification -> Just NominalDiffTime
started) -> do
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- NominalDiffTime
started forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_operationTimeout Settings
setts) forall a b. (a -> b) -> a -> b
$
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self IdentificationTimeout
IdentificationTimeout)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self
(Stage -> Bool
defaultConnecting -> Bool
True) -> Internal -> EventStore ()
manageHeartbeats Internal
self
Connected Connection
conn -> do
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
NominalDiffTime
timeoutCheckStart <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef NominalDiffTime
_lastCheck
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- NominalDiffTime
timeoutCheckStart forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_operationTimeout Settings
setts) forall a b. (a -> b) -> a -> b
$ do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug Text
"Start check and retry..."
[Package]
pkgs <- Registry -> UUID -> EventStore [Package]
Operation.registryCheckAndRetry Registry
_opMgr (Connection -> UUID
connectionId Connection
conn)
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Connection -> Package -> EventStore ()
enqueuePackage Connection
conn) [Package]
pkgs
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug Text
"Completed check and retry"
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef NominalDiffTime
_lastCheck NominalDiffTime
elapsed
Stage
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Stage
stage <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Stage -> Bool
atLeastEstablishingState Stage
stage) forall a b. (a -> b) -> a -> b
$
Internal -> EventStore ()
manageHeartbeats Internal
self
where
onGoingConnection :: Stage -> Maybe Attempts
onGoingConnection (Connecting Attempts
att ConnectingState
Reconnecting) = forall a. a -> Maybe a
Just Attempts
att
onGoingConnection (Connecting Attempts
att ConnectionEstablishing{}) = forall a. a -> Maybe a
Just Attempts
att
onGoingConnection Stage
_ = forall a. Maybe a
Nothing
pendingIdentification :: Stage -> Maybe NominalDiffTime
pendingIdentification (Connecting Attempts
_ (Identification UUID
_ NominalDiffTime
started Connection
_)) = forall a. a -> Maybe a
Just NominalDiffTime
started
pendingIdentification Stage
_ = forall a. Maybe a
Nothing
pendingAuthenticate :: Stage -> Maybe (NominalDiffTime, Attempts, Connection)
pendingAuthenticate (Connecting Attempts
a (Authentication UUID
_ NominalDiffTime
started Connection
c)) = forall a. a -> Maybe a
Just (NominalDiffTime
started, Attempts
a, Connection
c)
pendingAuthenticate Stage
_ = forall a. Maybe a
Nothing
defaultConnecting :: Stage -> Bool
defaultConnecting Connecting{} = Bool
True
defaultConnecting Stage
_ = Bool
False
maxAttemptReached :: EventStore ()
maxAttemptReached = do
forall e. Exception e => Internal -> e -> EventStore ()
closeConnection Internal
self ConnectionMaxAttemptReached
ConnectionMaxAttemptReached
forall a. Typeable a => a -> EventStore ()
publish (forall e. Exception e => e -> FatalException
FatalException ConnectionMaxAttemptReached
ConnectionMaxAttemptReached)
retryConnection :: Int -> EventStore ()
retryConnection Int
cnt = do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|Checking reconnection... (attempt #{cnt}).|]
Internal -> EventStore ()
discover Internal
self
data ServerHeartbeatTimeout = ServerHeartbeatTimeout deriving Typeable
instance Show ServerHeartbeatTimeout where
show :: ServerHeartbeatTimeout -> String
show ServerHeartbeatTimeout
_ = String
"Server connection has heartbeat timeout"
instance Exception ServerHeartbeatTimeout
manageHeartbeats :: Internal -> EventStore ()
manageHeartbeats :: Internal -> EventStore ()
manageHeartbeats self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
go forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self
where
go :: Connection -> EventStore ()
go Connection
conn = do
NominalDiffTime
elapsed <- forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
Integer
pkgNum <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Integer
_lastPkgNum
HeartbeatTracker
tracker <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef HeartbeatTracker
_tracker
Settings
setts <- EventStore Settings
getSettings
let interval :: NominalDiffTime
interval = Settings -> NominalDiffTime
s_heartbeatInterval Settings
setts
timeout :: NominalDiffTime
timeout = Settings -> NominalDiffTime
s_heartbeatTimeout Settings
setts
initTracker :: HeartbeatTracker
initTracker = HeartbeatTracker
tracker
{ _heartbeatStage :: HeartbeatStage
_heartbeatStage = HeartbeatStage
Interval
, _startedSince :: NominalDiffTime
_startedSince = NominalDiffTime
elapsed
, _pkgNum :: Integer
_pkgNum = Integer
pkgNum
}
if HeartbeatTracker -> Integer
_pkgNum HeartbeatTracker
tracker forall a. Eq a => a -> a -> Bool
/= Integer
pkgNum
then forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef HeartbeatTracker
_tracker HeartbeatTracker
initTracker
else
case HeartbeatTracker -> HeartbeatStage
_heartbeatStage HeartbeatTracker
tracker of
HeartbeatStage
Interval
| NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- HeartbeatTracker -> NominalDiffTime
_startedSince HeartbeatTracker
tracker forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
interval -> do
UUID
uuid <- forall (m :: * -> *). MonadIO m => m UUID
freshUUID
let pkg :: Package
pkg = UUID -> Package
heartbeatRequestPackage UUID
uuid
newTracker :: HeartbeatTracker
newTracker = HeartbeatTracker
tracker
{ _heartbeatStage :: HeartbeatStage
_heartbeatStage = HeartbeatStage
Timeout
, _startedSince :: NominalDiffTime
_startedSince = NominalDiffTime
elapsed
, _pkgNum :: Integer
_pkgNum = Integer
pkgNum
}
Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef HeartbeatTracker
_tracker HeartbeatTracker
newTracker
| Bool
otherwise -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
HeartbeatStage
Timeout
| NominalDiffTime
elapsed forall a. Num a => a -> a -> a
- HeartbeatTracker -> NominalDiffTime
_startedSince HeartbeatTracker
tracker forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
timeout -> do
EventStore ()
monitorIncrHeartbeatTimeouts
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logInfo [i|Closing #{conn} due to HEARTBEAT TIMEOUT at pkgNum #{pkgNum}|]
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self ServerHeartbeatTimeout
ServerHeartbeatTimeout Connection
conn
| Bool
otherwise -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
onArrived :: Internal -> PackageArrived -> EventStore ()
onArrived :: Internal -> PackageArrived -> EventStore ()
onArrived self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (PackageArrived Connection
conn pkg :: Package
pkg@Package{Maybe Credentials
ByteString
UUID
Command
packageCred :: Package -> Maybe Credentials
packageData :: Package -> ByteString
packageCmd :: Package -> Command
packageCred :: Maybe Credentials
packageData :: ByteString
packageCorrelation :: UUID
packageCmd :: Command
packageCorrelation :: Package -> UUID
..}) = do
Stage
cur <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Stage -> Bool
closedOrInit Stage
cur) forall a b. (a -> b) -> a -> b
$
Internal -> EventStore ()
incrPackageNumber Internal
self
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Stage -> Maybe Attempts
onAuthentication -> Just Attempts
att) -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Command
packageCmd forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd) forall a b. (a -> b) -> a -> b
$
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn Text
"Not authenticated."
Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal
self Attempts
att Connection
conn
(Stage -> Bool
onIdentification -> Bool
True) ->
Internal -> EventStore ()
clientIdentified Internal
self
(Stage -> Bool
runningConnection -> Bool
True) -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|Package received: #{pkg}.|]
EventStore ()
handlePackage
Stage
_ -> $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug [i|Package IGNORED: #{pkg}.|]
where
onIdentification :: Stage -> Bool
onIdentification (Connecting Attempts
_ (Identification UUID
u NominalDiffTime
_ Connection
_)) =
UUID
packageCorrelation forall a. Eq a => a -> a -> Bool
== UUID
u Bool -> Bool -> Bool
&& Command
packageCmd forall a. Eq a => a -> a -> Bool
== Command
clientIdentifiedCmd
onIdentification Stage
_ = Bool
False
onAuthentication :: Stage -> Maybe Attempts
onAuthentication (Connecting Attempts
a (Authentication UUID
u NominalDiffTime
_ Connection
_)) =
if UUID
packageCorrelation forall a. Eq a => a -> a -> Bool
== UUID
u Bool -> Bool -> Bool
&& (Command
packageCmd forall a. Eq a => a -> a -> Bool
== Command
authenticatedCmd Bool -> Bool -> Bool
|| Command
packageCmd forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd)
then forall a. a -> Maybe a
Just Attempts
a
else forall a. Maybe a
Nothing
onAuthentication Stage
_ = forall a. Maybe a
Nothing
runningConnection :: Stage -> Bool
runningConnection (Connecting Attempts
_ (ConnectionEstablishing Connection
c)) = Connection
conn forall a. Eq a => a -> a -> Bool
== Connection
c
runningConnection (Connecting Attempts
_ (Authentication UUID
_ NominalDiffTime
_ Connection
c)) = Connection
conn forall a. Eq a => a -> a -> Bool
== Connection
c
runningConnection (Connecting Attempts
_ (Identification UUID
_ NominalDiffTime
_ Connection
c)) = Connection
conn forall a. Eq a => a -> a -> Bool
== Connection
c
runningConnection (Connected Connection
c) = Connection
conn forall a. Eq a => a -> a -> Bool
== Connection
c
runningConnection Stage
_ = Bool
False
heartbeatResponse :: Package
heartbeatResponse = UUID -> Package
heartbeatResponsePackage UUID
packageCorrelation
handlePackage :: EventStore ()
handlePackage
| Command
packageCmd forall a. Eq a => a -> a -> Bool
== Command
heartbeatResponseCmd = forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Command
packageCmd forall a. Eq a => a -> a -> Bool
== Command
heartbeatRequestCmd =
Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
heartbeatResponse
| Bool
otherwise =
Registry -> Package -> EventStore (Maybe NodeEndPoints)
Operation.registryHandle Registry
_opMgr Package
pkg forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe NodeEndPoints
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just NodeEndPoints
node -> Internal -> NodeEndPoints -> EventStore ()
forceReconnect Internal
self NodeEndPoints
node
closedOrInit :: Stage -> Bool
closedOrInit = \case
Stage
Init -> Bool
True
Stage
Closed -> Bool
True
Stage
_ -> Bool
False
isSameConnection :: Internal -> Connection -> EventStore Bool
isSameConnection :: Internal -> Connection -> EventStore Bool
isSameConnection Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Connection
conn = Stage -> Bool
go forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
where
go :: Stage -> Bool
go (Connected Connection
known) = Connection
known forall a. Eq a => a -> a -> Bool
== Connection
conn
go (Connecting Attempts
_ ConnectingState
state) =
case ConnectingState
state of
ConnectionEstablishing Connection
known -> Connection
known forall a. Eq a => a -> a -> Bool
== Connection
conn
Authentication UUID
_ NominalDiffTime
_ Connection
known -> Connection
known forall a. Eq a => a -> a -> Bool
== Connection
conn
Identification UUID
_ NominalDiffTime
_ Connection
known -> Connection
known forall a. Eq a => a -> a -> Bool
== Connection
conn
ConnectingState
_ -> Bool
False
go Stage
_ = Bool
False
onConnectionError :: Internal -> ConnectionError -> EventStore ()
onConnectionError :: Internal -> ConnectionError -> EventStore ()
onConnectionError self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (ConnectionError Connection
conn SomeException
e) =
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (Internal -> Connection -> EventStore Bool
isSameConnection Internal
self Connection
conn) forall a b. (a -> b) -> a -> b
$ do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logError [i|TCP #{conn} error. Cause: #{e}.|]
forall e. Exception e => Internal -> e -> EventStore ()
closeConnection Internal
self SomeException
e
onConnectionClosed :: Internal -> ConnectionClosed -> EventStore ()
onConnectionClosed :: Internal -> ConnectionClosed -> EventStore ()
onConnectionClosed self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (ConnectionClosed Connection
conn SomeException
cause) =
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (Internal -> Connection -> EventStore Bool
isSameConnection Internal
self Connection
conn) forall a b. (a -> b) -> a -> b
$ do
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self SomeException
cause Connection
conn
EventStore ()
monitorIncrConnectionDrop
onShutdown :: Internal -> SystemShutdown -> EventStore ()
onShutdown :: Internal -> SystemShutdown -> EventStore ()
onShutdown self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} SystemShutdown
_ = do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug Text
"Shutting down..."
Maybe Connection
mConn <- Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed Internal
self
Registry -> EventStore ()
Operation.registryAbort Registry
_opMgr
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
dispose Maybe Connection
mConn
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug Text
"Shutdown properly."
forall a. Typeable a => a -> EventStore ()
publish (Service -> ServiceTerminated
ServiceTerminated Service
ConnectionManager)
onTransmit :: Internal -> Transmit -> EventStore ()
onTransmit :: Internal -> Transmit -> EventStore ()
onTransmit Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (Transmit Mailbox
m Lifetime
lifetime Package
pkg) =
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Stage
Closed
-> forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail Mailbox
m OperationError
Aborted
Connected Connection
conn
-> do Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg
Registry -> UUID -> Lifetime -> Package -> Mailbox -> EventStore ()
Operation.registryRegister Registry
_opMgr (Connection -> UUID
connectionId Connection
conn) Lifetime
lifetime Package
pkg Mailbox
m
Stage
_ -> Registry -> Mailbox -> Lifetime -> Package -> EventStore ()
Operation.registryPostpone Registry
_opMgr Mailbox
m Lifetime
lifetime Package
pkg
onCloseConnection :: Internal -> CloseConnection -> EventStore ()
onCloseConnection :: Internal -> CloseConnection -> EventStore ()
onCloseConnection Internal
self CloseConnection
e = forall e. Exception e => Internal -> e -> EventStore ()
closeConnection Internal
self CloseConnection
e
lookupConnection :: Internal -> EventStore (Maybe Connection)
lookupConnection :: Internal -> EventStore (Maybe Connection)
lookupConnection Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = IORef Stage -> EventStore (Maybe Connection)
lookingUpConnection IORef Stage
_stage
lookingUpConnection :: IORef Stage -> EventStore (Maybe Connection)
lookingUpConnection :: IORef Stage -> EventStore (Maybe Connection)
lookingUpConnection IORef Stage
ref = Stage -> Maybe Connection
go forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
ref
where
go :: Stage -> Maybe Connection
go (Connected Connection
conn) = forall a. a -> Maybe a
Just Connection
conn
go (Connecting Attempts
_ ConnectingState
state) =
case ConnectingState
state of
ConnectionEstablishing Connection
conn -> forall a. a -> Maybe a
Just Connection
conn
Authentication UUID
_ NominalDiffTime
_ Connection
conn -> forall a. a -> Maybe a
Just Connection
conn
Identification UUID
_ NominalDiffTime
_ Connection
conn -> forall a. a -> Maybe a
Just Connection
conn
ConnectingState
_ -> forall a. Maybe a
Nothing
go Stage
_ = forall a. Maybe a
Nothing
onSendPackage :: Internal -> SendPackage -> EventStore ()
onSendPackage :: Internal -> SendPackage -> EventStore ()
onSendPackage Internal
self (SendPackage Package
pkg) =
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
sending forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self
where
sending :: Connection -> EventStore ()
sending Connection
conn = Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg