{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Database.EventStore.Internal.Discovery
( Discovery(..)
, GossipSeed
, DnsDiscoveryException(..)
, ClusterSettings(..)
, DnsServer(..)
, EndPoint(..)
, staticEndPointDiscovery
, clusterDnsEndPointDiscovery
, gossipSeedClusterSettings
, simpleDnsEndPointDiscovery
, dnsClusterSettings
, gossipSeed
, gossipSeedWithHeader
, gossipSeedHeader
, gossipSeedHost
, gossipSeedPort
) where
import Prelude (String, fail)
import Data.Maybe
import Control.Exception.Safe (tryAny)
import Data.Aeson
import Data.Aeson.Types
import Data.Array.IO
import Data.DotNet.TimeSpan
import Data.List.NonEmpty (NonEmpty)
import Data.UUID
import Network.HTTP.Client
import Network.DNS hiding (decode)
import System.Random
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
data DnsDiscoveryException
= MaxDiscoveryAttemptReached ByteString
| DNSDiscoveryError DNSError
deriving (Int -> DnsDiscoveryException -> ShowS
[DnsDiscoveryException] -> ShowS
DnsDiscoveryException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DnsDiscoveryException] -> ShowS
$cshowList :: [DnsDiscoveryException] -> ShowS
show :: DnsDiscoveryException -> String
$cshow :: DnsDiscoveryException -> String
showsPrec :: Int -> DnsDiscoveryException -> ShowS
$cshowsPrec :: Int -> DnsDiscoveryException -> ShowS
Show, Typeable)
instance Exception DnsDiscoveryException
httpRequest :: EndPoint -> String -> IO Request
httpRequest :: EndPoint -> String -> IO Request
httpRequest (EndPoint String
ip Int
p) String
path = forall (m :: * -> *). MonadThrow m => String -> m Request
parseUrlThrow String
url
where
url :: String
url = String
"http://" forall a. Semigroup a => a -> a -> a
<> String
ip forall a. Semigroup a => a -> a -> a
<> String
":" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
p forall a. Semigroup a => a -> a -> a
<> String
path
data GossipSeed =
GossipSeed
{ GossipSeed -> EndPoint
gossipEndpoint :: !EndPoint
, :: !String
} deriving Int -> GossipSeed -> ShowS
[GossipSeed] -> ShowS
GossipSeed -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GossipSeed] -> ShowS
$cshowList :: [GossipSeed] -> ShowS
show :: GossipSeed -> String
$cshow :: GossipSeed -> String
showsPrec :: Int -> GossipSeed -> ShowS
$cshowsPrec :: Int -> GossipSeed -> ShowS
Show
gossipSeed :: String -> Int -> GossipSeed
gossipSeed :: String -> Int -> GossipSeed
gossipSeed String
h Int
p = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
""
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
String
h Int
p String
hd = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
hd
gossipSeedHost :: GossipSeed -> String
gossipSeedHost :: GossipSeed -> String
gossipSeedHost = EndPoint -> String
endPointIp forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort = EndPoint -> Int
endPointPort forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint
emptyGossipSeed :: GossipSeed
emptyGossipSeed :: GossipSeed
emptyGossipSeed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
emptyEndPoint String
""
newtype Discovery =
Discovery { Discovery -> Maybe EndPoint -> EventStore (Maybe EndPoint)
runDiscovery :: Maybe EndPoint -> EventStore (Maybe EndPoint) }
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery String
host Int
port =
(Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint String
host Int
port
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery ByteString
domain Maybe DnsServer
srv Int
port = (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> do
let conf :: ResolvConf
conf =
case Maybe DnsServer
srv of
Maybe DnsServer
Nothing -> ResolvConf
defaultResolvConf
Just DnsServer
tpe ->
let rc :: FileOrNumericHost
rc =
case DnsServer
tpe of
DnsFilePath String
p -> String -> FileOrNumericHost
RCFilePath String
p
DnsHostName String
h -> String -> FileOrNumericHost
RCHostName String
h
DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc }
ResolvSeed
dnsSeed <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
Either DNSError [IPv4]
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
domain
case Either DNSError [IPv4]
res of
Left DNSError
e -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
Right [IPv4]
ips -> do
let pts :: [EndPoint]
pts = [ String -> Int -> EndPoint
EndPoint (forall a. Show a => a -> String
show IPv4
ip) Int
port | IPv4
ip <- [IPv4]
ips ]
case [EndPoint]
pts of
[] -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
EndPoint
pt:[EndPoint]
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just EndPoint
pt
data DnsServer
= DnsFilePath String
| DnsHostName String
| DnsHostPort String Int
data ClusterSettings =
ClusterSettings
{ ClusterSettings -> ByteString
clusterDns :: !ByteString
, ClusterSettings -> Int
clusterMaxDiscoverAttempts :: !Int
, ClusterSettings -> Int
clusterExternalGossipPort :: !Int
, ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed))
, ClusterSettings -> TimeSpan
clusterGossipTimeout :: !TimeSpan
, ClusterSettings -> Maybe DnsServer
clusterDnsServer :: !(Maybe DnsServer)
}
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings NonEmpty GossipSeed
xs =
ClusterSettings
{ clusterDns :: ByteString
clusterDns = ByteString
""
, clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
, clusterExternalGossipPort :: Int
clusterExternalGossipPort = Int
0
, clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterGossipSeeds = forall a. a -> Maybe a
Just NonEmpty GossipSeed
xs
, clusterGossipTimeout :: TimeSpan
clusterGossipTimeout = Double -> TimeSpan
fromSeconds Double
1
, clusterDnsServer :: Maybe DnsServer
clusterDnsServer = forall a. Maybe a
Nothing
}
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings ByteString
clusterDns = ClusterSettings{Int
ByteString
TimeSpan
forall a. Maybe a
clusterDnsServer :: forall a. Maybe a
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: forall a. Maybe a
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
..}
where
clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
clusterExternalGossipPort :: Int
clusterExternalGossipPort = Int
0
clusterGossipSeeds :: Maybe a
clusterGossipSeeds = forall a. Maybe a
Nothing
clusterGossipTimeout :: TimeSpan
clusterGossipTimeout = Double -> TimeSpan
fromSeconds Double
1
clusterDnsServer :: Maybe a
clusterDnsServer = forall a. Maybe a
Nothing
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery ClusterSettings
settings = do
IORef (Maybe [MemberInfo])
ref <- forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef forall a. Maybe a
Nothing
Manager
manager <- ManagerSettings -> IO Manager
newManager ManagerSettings
defaultManagerSettings
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
fend -> Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
manager IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings
data VNodeState
= Initializing
| Unknown
| PreReplica
| CatchingUp
| Clone
| Slave
| PreMaster
| Master
| Manager
| ShuttingDown
| Shutdown
deriving (VNodeState -> VNodeState -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: VNodeState -> VNodeState -> Bool
$c/= :: VNodeState -> VNodeState -> Bool
== :: VNodeState -> VNodeState -> Bool
$c== :: VNodeState -> VNodeState -> Bool
Eq, Eq VNodeState
VNodeState -> VNodeState -> Bool
VNodeState -> VNodeState -> Ordering
VNodeState -> VNodeState -> VNodeState
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: VNodeState -> VNodeState -> VNodeState
$cmin :: VNodeState -> VNodeState -> VNodeState
max :: VNodeState -> VNodeState -> VNodeState
$cmax :: VNodeState -> VNodeState -> VNodeState
>= :: VNodeState -> VNodeState -> Bool
$c>= :: VNodeState -> VNodeState -> Bool
> :: VNodeState -> VNodeState -> Bool
$c> :: VNodeState -> VNodeState -> Bool
<= :: VNodeState -> VNodeState -> Bool
$c<= :: VNodeState -> VNodeState -> Bool
< :: VNodeState -> VNodeState -> Bool
$c< :: VNodeState -> VNodeState -> Bool
compare :: VNodeState -> VNodeState -> Ordering
$ccompare :: VNodeState -> VNodeState -> Ordering
Ord, forall x. Rep VNodeState x -> VNodeState
forall x. VNodeState -> Rep VNodeState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep VNodeState x -> VNodeState
$cfrom :: forall x. VNodeState -> Rep VNodeState x
Generic, Int -> VNodeState -> ShowS
[VNodeState] -> ShowS
VNodeState -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [VNodeState] -> ShowS
$cshowList :: [VNodeState] -> ShowS
show :: VNodeState -> String
$cshow :: VNodeState -> String
showsPrec :: Int -> VNodeState -> ShowS
$cshowsPrec :: Int -> VNodeState -> ShowS
Show)
instance FromJSON VNodeState
newtype GUUID = GUUID UUID deriving Int -> GUUID -> ShowS
[GUUID] -> ShowS
GUUID -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GUUID] -> ShowS
$cshowList :: [GUUID] -> ShowS
show :: GUUID -> String
$cshow :: GUUID -> String
showsPrec :: Int -> GUUID -> ShowS
$cshowsPrec :: Int -> GUUID -> ShowS
Show
instance FromJSON GUUID where
parseJSON :: Value -> Parser GUUID
parseJSON (String Text
txt) =
case Text -> Maybe UUID
fromText Text
txt of
Just UUID
uuid -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ UUID -> GUUID
GUUID UUID
uuid
Maybe UUID
_ -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail forall a b. (a -> b) -> a -> b
$ String
"Wrong UUID format " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Text
txt
parseJSON Value
invalid = forall a. String -> Value -> Parser a
typeMismatch String
"UUID" Value
invalid
data MemberInfo =
MemberInfo
{ MemberInfo -> GUUID
_instanceId :: !GUUID
, MemberInfo -> VNodeState
_state :: !VNodeState
, MemberInfo -> Bool
_isAlive :: !Bool
, MemberInfo -> String
_internalTcpIp :: !String
, MemberInfo -> Int
_internalTcpPort :: !Int
, MemberInfo -> String
_externalTcpIp :: !String
, MemberInfo -> Int
_externalTcpPort :: !Int
, MemberInfo -> String
_internalHttpIp :: !String
, MemberInfo -> Int
_internalHttpPort :: !Int
, MemberInfo -> String
_externalHttpIp :: !String
, MemberInfo -> Int
_externalHttpPort :: !Int
, MemberInfo -> Int64
_lastCommitPosition :: !Int64
, MemberInfo -> Int64
_writerCheckpoint :: !Int64
, MemberInfo -> Int64
_chaserCheckpoint :: !Int64
, MemberInfo -> Int64
_epochPosition :: !Int64
, MemberInfo -> Int
_epochNumber :: !Int
, MemberInfo -> GUUID
_epochId :: !GUUID
, MemberInfo -> Int
_nodePriority :: !Int
} deriving Int -> MemberInfo -> ShowS
[MemberInfo] -> ShowS
MemberInfo -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MemberInfo] -> ShowS
$cshowList :: [MemberInfo] -> ShowS
show :: MemberInfo -> String
$cshow :: MemberInfo -> String
showsPrec :: Int -> MemberInfo -> ShowS
$cshowsPrec :: Int -> MemberInfo -> ShowS
Show
instance FromJSON MemberInfo where
parseJSON :: Value -> Parser MemberInfo
parseJSON (Object Object
m) =
GUUID
-> VNodeState
-> Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo
MemberInfo
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"instanceId"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"state"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"isAlive"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpIp"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpPort"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpIp"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpPort"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpIp"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpPort"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpIp"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpPort"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"lastCommitPosition"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"writerCheckpoint"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"chaserCheckpoint"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochPosition"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochNumber"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochId"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"nodePriority"
parseJSON Value
invalid = forall a. String -> Value -> Parser a
typeMismatch String
"MemberInfo" Value
invalid
data ClusterInfo =
ClusterInfo { ClusterInfo -> [MemberInfo]
members :: [MemberInfo] }
deriving (Int -> ClusterInfo -> ShowS
[ClusterInfo] -> ShowS
ClusterInfo -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClusterInfo] -> ShowS
$cshowList :: [ClusterInfo] -> ShowS
show :: ClusterInfo -> String
$cshow :: ClusterInfo -> String
showsPrec :: Int -> ClusterInfo -> ShowS
$cshowsPrec :: Int -> ClusterInfo -> ShowS
Show, forall x. Rep ClusterInfo x -> ClusterInfo
forall x. ClusterInfo -> Rep ClusterInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ClusterInfo x -> ClusterInfo
$cfrom :: forall x. ClusterInfo -> Rep ClusterInfo x
Generic)
instance FromJSON ClusterInfo
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
mgr IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings = do
Maybe [MemberInfo]
old_m <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef (Maybe [MemberInfo])
ref
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef IORef (Maybe [MemberInfo])
ref forall a. Maybe a
Nothing
IOArray Int GossipSeed
candidates <- case Maybe [MemberInfo]
old_m of
Maybe [MemberInfo]
Nothing -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns ClusterSettings
settings
Just [MemberInfo]
old -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend [MemberInfo]
old
forall a b.
IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int GossipSeed
candidates forall a b. (a -> b) -> a -> b
$ \Int
idx -> do
GossipSeed
c <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int GossipSeed
candidates Int
idx
Maybe ClusterInfo
res <- ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings
settings Manager
mgr GossipSeed
c
let fin_end :: Maybe (ClusterInfo, EndPoint)
fin_end = do
ClusterInfo
info <- Maybe ClusterInfo
res
EndPoint
best <- [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info
forall (m :: * -> *) a. Monad m => a -> m a
return (ClusterInfo
info, EndPoint
best)
case Maybe (ClusterInfo, EndPoint)
fin_end of
Maybe (ClusterInfo, EndPoint)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just (ClusterInfo
info, EndPoint
best) -> do
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef IORef (Maybe [MemberInfo])
ref (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just EndPoint
best
tryGetGossipFrom :: ClusterSettings
-> Manager
-> GossipSeed
-> EventStore (Maybe ClusterInfo)
tryGetGossipFrom :: ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} Manager
mgr GossipSeed
seed = do
Request
init_req <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ EndPoint -> String -> IO Request
httpRequest (GossipSeed -> EndPoint
gossipEndpoint GossipSeed
seed) String
"/gossip?format=json"
let timeout :: Int
timeout = forall a b. (RealFrac a, Integral b) => a -> b
truncate (TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout forall a. Num a => a -> a -> a
* Double
1000)
req :: Request
req = Request
init_req { responseTimeout :: ResponseTimeout
responseTimeout = Int -> ResponseTimeout
responseTimeoutMicro Int
timeout }
Either SomeException (Response ByteString)
eithResp <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
mgr
case Either SomeException (Response ByteString)
eithResp of
Right Response ByteString
resp -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. FromJSON a => ByteString -> Maybe a
decode forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
responseBody Response ByteString
resp
Left SomeException
err -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logInfo [i|Failed to get cluster info from [#{seed}], error: #{err}.|]
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode [MemberInfo]
members = Maybe EndPoint
node_m
where
nodes :: [MemberInfo]
nodes = [MemberInfo
m | MemberInfo
m <- [MemberInfo]
members
, MemberInfo -> Bool
_isAlive MemberInfo
m
, VNodeState -> Bool
allowedState forall a b. (a -> b) -> a -> b
$ MemberInfo -> VNodeState
_state MemberInfo
m
]
node_m :: Maybe EndPoint
node_m =
case forall o seq.
(Ord o, SemiSequence seq) =>
(Element seq -> o) -> seq -> seq
sortOn (forall a. a -> Down a
Down forall b c a. (b -> c) -> (a -> b) -> a -> c
. MemberInfo -> VNodeState
_state) [MemberInfo]
nodes of
[] -> forall a. Maybe a
Nothing
MemberInfo
n:[MemberInfo]
_ -> forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
n) (MemberInfo -> Int
_externalTcpPort MemberInfo
n)
allowedState :: VNodeState -> Bool
allowedState VNodeState
Manager = Bool
False
allowedState VNodeState
ShuttingDown = Bool
False
allowedState VNodeState
Shutdown = Bool
False
allowedState VNodeState
_ = Bool
True
gossipCandidatesFromOldGossip :: Maybe EndPoint
-> [MemberInfo]
-> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip :: Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend_m [MemberInfo]
oldGossip =
[MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
candidates
where
candidates :: [MemberInfo]
candidates =
case Maybe EndPoint
fend_m of
Maybe EndPoint
Nothing -> [MemberInfo]
oldGossip
Just EndPoint
fend -> [ MemberInfo
c | MemberInfo
c <- [MemberInfo]
oldGossip
, String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
c) (MemberInfo -> Int
_externalTcpPort MemberInfo
c) forall a. Eq a => a -> a -> Bool
/= EndPoint
fend
]
data AState = AState !Int !Int
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
members = do
IOArray Int GossipSeed
arr <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> e -> m (a i e)
newArray (Int
0, Int
len) GossipSeed
emptyGossipSeed
AState Int
idx Int
j <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr) (Int -> Int -> AState
AState (-Int
1) Int
len) [MemberInfo]
members
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
0 Int
idx
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
j (Int
len forall a. Num a => a -> a -> a
- Int
1)
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
where
len :: Int
len = forall mono. MonoFoldable mono => mono -> Int
length [MemberInfo]
members
go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr (AState Int
idx Int
j) MemberInfo
m =
case MemberInfo -> VNodeState
_state MemberInfo
m of
VNodeState
Manager -> do
let new_j :: Int
new_j = Int
j forall a. Num a => a -> a -> a
- Int
1
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_j GossipSeed
seed
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
idx Int
new_j)
VNodeState
_ -> do
let new_i :: Int
new_i = Int
idx forall a. Num a => a -> a -> a
+ Int
1
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_i GossipSeed
seed
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
new_i Int
j)
where
end :: EndPoint
end = String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalHttpIp MemberInfo
m) (MemberInfo -> Int
_externalHttpPort MemberInfo
m)
seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns settings :: ClusterSettings
settings@ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
IOArray Int GossipSeed
arr <- EventStore (IOArray Int GossipSeed)
endpoints
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IOArray Int a -> IO ()
shuffleAll IOArray Int GossipSeed
arr
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
where
endpoints :: EventStore (IOArray Int GossipSeed)
endpoints =
case Maybe (NonEmpty GossipSeed)
clusterGossipSeeds of
Maybe (NonEmpty GossipSeed)
Nothing -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings
settings
Just NonEmpty GossipSeed
ss -> let ls :: [GossipSeed]
ls = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty GossipSeed
ss
len :: Int
len = forall mono. MonoFoldable mono => mono -> Int
length [GossipSeed]
ls
in forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> [e] -> m (a i e)
newListArray (Int
0, Int
len forall a. Num a => a -> a -> a
- Int
1) [GossipSeed]
ls
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
let timeoutMicros :: Double
timeoutMicros = TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout forall a. Num a => a -> a -> a
* Double
1000
conf :: ResolvConf
conf =
case Maybe DnsServer
clusterDnsServer of
Maybe DnsServer
Nothing -> ResolvConf
defaultResolvConf
Just DnsServer
tpe ->
let rc :: FileOrNumericHost
rc =
case DnsServer
tpe of
DnsFilePath String
p -> String -> FileOrNumericHost
RCFilePath String
p
DnsHostName String
h -> String -> FileOrNumericHost
RCHostName String
h
DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc }
ResolvSeed
dnsSeed <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
{ resolvTimeout :: Int
resolvTimeout = forall a b. (RealFrac a, Integral b) => a -> b
truncate Double
timeoutMicros
, resolvRetry :: Int
resolvRetry = Int
clusterMaxDiscoverAttempts
}
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> do
Either DNSError [IPv4]
result <- Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
clusterDns
case Either DNSError [IPv4]
result of
Left DNSError
e -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
Right [IPv4]
ips -> do
let len :: Int
len = forall mono. MonoFoldable mono => mono -> Int
length [IPv4]
ips forall a. Num a => a -> a -> a
- Int
1
IOArray Int GossipSeed
arr <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> m (a i e)
newArray_ (Int
0, Int
len)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [IPv4]
ips) forall a b. (a -> b) -> a -> b
$ \(Int
idx, IPv4
ip) -> do
let end :: EndPoint
end = String -> Int -> EndPoint
EndPoint (forall a. Show a => a -> String
show IPv4
ip) Int
clusterExternalGossipPort
seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
idx GossipSeed
seed
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
shuffleAll :: IOArray Int a -> IO ()
shuffleAll :: forall a. IOArray Int a -> IO ()
shuffleAll IOArray Int a
arr = do
(Int
low, Int
hig) <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
low Int
hig
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle :: forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
from Int
to = Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to forall a b. (a -> b) -> a -> b
$ \Int
cur -> do
Int
idx <- forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
cur, Int
to)
a
tmp <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
idx
a
value <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
cur
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
idx a
value
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
cur a
tmp
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to Int -> IO ()
k = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
from forall a. Ord a => a -> a -> Bool
<= Int
to) forall a b. (a -> b) -> a -> b
$ Int -> Int -> IO ()
loop (Int
to forall a. Num a => a -> a -> a
+ Int
1) Int
from
where
loop :: Int -> Int -> IO ()
loop Int
len Int
cur
| Int
len forall a. Eq a => a -> a -> Bool
== Int
cur = forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = do
Int -> IO ()
k Int
cur
Int -> Int -> IO ()
loop Int
len (Int
cur forall a. Num a => a -> a -> a
+ Int
1)
forArrayFirst :: IOArray Int a
-> (Int -> EventStore (Maybe b))
-> EventStore (Maybe b)
forArrayFirst :: forall a b.
IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int a
arr Int -> EventStore (Maybe b)
k = do
(Int
low, Int
hig) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
forall b.
Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
low Int
hig Int -> EventStore (Maybe b)
k
forRangeFirst :: Int
-> Int
-> (Int -> EventStore (Maybe b))
-> EventStore (Maybe b)
forRangeFirst :: forall b.
Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
from Int
to Int -> EventStore (Maybe b)
k = do
if Int
from forall a. Ord a => a -> a -> Bool
<= Int
to then Int -> Int -> EventStore (Maybe b)
loop (Int
to forall a. Num a => a -> a -> a
+ Int
1) Int
from else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
where
loop :: Int -> Int -> EventStore (Maybe b)
loop Int
len Int
cur
| Int
len forall a. Eq a => a -> a -> Bool
== Int
cur = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
| Bool
otherwise = do
Maybe b
res <- Int -> EventStore (Maybe b)
k Int
cur
if forall a. Maybe a -> Bool
isJust Maybe b
res then forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
res else Int -> Int -> EventStore (Maybe b)
loop Int
len (Int
cur forall a. Num a => a -> a -> a
+ Int
1)