{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE LambdaCase #-}
module Database.EventStore.Internal.Connection
( ConnectionBuilder(..)
, Connection(..)
, RecvOutcome(..)
, PackageArrived(..)
, ConnectionError(..)
, ConnectionEstablished(..)
, ConnectionClosed(..)
, ConnectionRef(..)
, getConnection
, connectionBuilder
, connectionError
) where
import Prelude (String)
import Text.Printf
import Control.Monad.Reader
import Data.Serialize
import Data.UUID
import qualified Network.Connection as Network
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Types
newtype ConnectionBuilder =
ConnectionBuilder { ConnectionBuilder -> EndPoint -> EventStore Connection
connect :: EndPoint -> EventStore Connection }
data RecvOutcome
= ResetByPeer
| Recv Package
| WrongFraming
| ParsingError
type ConnectionId = UUID
newtype ConnectionRef =
ConnectionRef { ConnectionRef -> EventStore (Maybe Connection)
maybeConnection :: EventStore (Maybe Connection) }
getConnection :: ConnectionRef -> EventStore Connection
getConnection :: ConnectionRef -> EventStore Connection
getConnection ConnectionRef
ref =
ConnectionRef -> EventStore (Maybe Connection)
maybeConnection ConnectionRef
ref forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Connection
conn -> forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
Maybe Connection
Nothing -> do
$(logError) Text
"Expected a connection but got none."
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString String
"No current connection (impossible situation)"
data Connection =
Connection { Connection -> ConnectionId
connectionId :: ConnectionId
, Connection -> EndPoint
connectionEndPoint :: EndPoint
, Connection -> Package -> EventStore ()
enqueuePackage :: Package -> EventStore ()
, Connection -> EventStore ()
dispose :: EventStore ()
}
instance Show Connection where
show :: Connection -> String
show Connection{ConnectionId
EndPoint
EventStore ()
Package -> EventStore ()
dispose :: EventStore ()
enqueuePackage :: Package -> EventStore ()
connectionEndPoint :: EndPoint
connectionId :: ConnectionId
dispose :: Connection -> EventStore ()
enqueuePackage :: Connection -> Package -> EventStore ()
connectionEndPoint :: Connection -> EndPoint
connectionId :: Connection -> ConnectionId
..} = String
"Connection [" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ConnectionId
connectionId forall a. Semigroup a => a -> a -> a
<> String
"] on "
forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show EndPoint
connectionEndPoint
instance Eq Connection where
Connection
a == :: Connection -> Connection -> Bool
== Connection
b = Connection -> ConnectionId
connectionId Connection
a forall a. Eq a => a -> a -> Bool
== Connection -> ConnectionId
connectionId Connection
b
newtype ConnectionState =
ConnectionState { ConnectionState -> TBMQueue Package
_sendQueue :: TBMQueue Package }
data PackageArrived = PackageArrived Connection Package deriving Typeable
data ConnectionError =
ConnectionError Connection SomeException deriving Typeable
connectionError :: Exception e => Connection -> e -> ConnectionError
connectionError :: forall e. Exception e => Connection -> e -> ConnectionError
connectionError Connection
c = Connection -> SomeException -> ConnectionError
ConnectionError Connection
c forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => e -> SomeException
toException
data ConnectionClosed = ConnectionClosed Connection SomeException
deriving Typeable
data ConnectionEstablished =
ConnectionEstablished Connection deriving Typeable
newtype ConnectionResetByPeer = ConnectionResetByPeer SomeException
deriving Typeable
instance Show ConnectionResetByPeer where
show :: ConnectionResetByPeer -> String
show (ConnectionResetByPeer SomeException
reason) =
String
"Connection reset by peer: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show SomeException
reason
instance Exception ConnectionResetByPeer
data ProtocolError
= WrongFramingError !String
| PackageParsingError !String
deriving Typeable
instance Show ProtocolError where
show :: ProtocolError -> String
show (WrongFramingError String
reason) = String
"Package framing error: " forall a. Semigroup a => a -> a -> a
<> String
reason
show (PackageParsingError String
reason) = String
"Package parsing error: " forall a. Semigroup a => a -> a -> a
<> String
reason
instance Exception ProtocolError
connectionBuilder :: Settings -> IO ConnectionBuilder
connectionBuilder :: Settings -> IO ConnectionBuilder
connectionBuilder Settings
setts = do
ConnectionContext
ctx <- IO ConnectionContext
Network.initConnectionContext
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (EndPoint -> EventStore Connection) -> ConnectionBuilder
ConnectionBuilder forall a b. (a -> b) -> a -> b
$ \EndPoint
ept -> do
ConnectionId
cid <- forall (m :: * -> *). MonadIO m => m ConnectionId
freshUUID
ConnectionState
state <- EventStore ConnectionState
createState
forall (m :: * -> *) a. MonadFix m => (a -> m a) -> m a
mfix forall a b. (a -> b) -> a -> b
$ \Connection
self -> do
Async Connection
tcpConnAsync <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (Settings -> ConnectionContext -> EndPoint -> EventStore Connection
createConnection Settings
setts ConnectionContext
ctx EndPoint
ept) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> do
forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
Right Connection
conn -> do
forall a. Typeable a => a -> EventStore ()
publish (Connection -> ConnectionEstablished
ConnectionEstablished Connection
self)
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
Async ()
sendAsync <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (ConnectionState -> Connection -> Async Connection -> EventStore ()
sending ConnectionState
state Connection
self Async Connection
tcpConnAsync)
Async ()
recvAsync <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (ConnectionState -> Connection -> Async Connection -> EventStore ()
receiving ConnectionState
state Connection
self Async Connection
tcpConnAsync)
forall (m :: * -> *) a. Monad m => a -> m a
return Connection { connectionId :: ConnectionId
connectionId = ConnectionId
cid
, connectionEndPoint :: EndPoint
connectionEndPoint = EndPoint
ept
, enqueuePackage :: Package -> EventStore ()
enqueuePackage = ConnectionState -> Package -> EventStore ()
enqueue ConnectionState
state
, dispose :: EventStore ()
dispose = do
ConnectionState -> EventStore ()
closeState ConnectionState
state
Async Connection -> EventStore ()
disposeConnection Async Connection
tcpConnAsync
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
cancel Async ()
sendAsync
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
cancel Async ()
recvAsync
}
createState :: EventStore ConnectionState
createState :: EventStore ConnectionState
createState = TBMQueue Package -> ConnectionState
ConnectionState forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
500)
closeState :: ConnectionState -> EventStore ()
closeState :: ConnectionState -> EventStore ()
closeState ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue Package
_sendQueue
createConnection :: Settings
-> Network.ConnectionContext
-> EndPoint
-> EventStore Network.Connection
createConnection :: Settings -> ConnectionContext -> EndPoint -> EventStore Connection
createConnection Settings
setts ConnectionContext
ctx EndPoint
ept = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ConnectionContext -> ConnectionParams -> IO Connection
Network.connectTo ConnectionContext
ctx ConnectionParams
params
where
host :: String
host = EndPoint -> String
endPointIp EndPoint
ept
port :: PortNumber
port = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ EndPoint -> Int
endPointPort EndPoint
ept
params :: ConnectionParams
params = String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
Network.ConnectionParams String
host PortNumber
port (Settings -> Maybe TLSSettings
s_ssl Settings
setts) forall a. Maybe a
Nothing
disposeConnection :: Async Network.Connection -> EventStore ()
disposeConnection :: Async Connection -> EventStore ()
disposeConnection Async Connection
as = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Either SomeException Connection -> EventStore ()
tryDisposing forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m (Maybe (Either SomeException a))
poll Async Connection
as
where
tryDisposing :: Either SomeException Connection -> EventStore ()
tryDisposing = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
disposing
disposing :: Connection -> EventStore ()
disposing = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
Network.connectionClose
receivePackage :: Connection -> Network.Connection -> EventStore Package
receivePackage :: Connection -> Connection -> EventStore Package
receivePackage Connection
self Connection
conn =
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> Int -> IO ByteString
Network.connectionGetExact Connection
conn Int
4) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> do
forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
Right ByteString
frame ->
case forall a. Get a -> ByteString -> Either String a
runGet Get Int
getLengthPrefix ByteString
frame of
Left String
reason -> do
let cause :: ProtocolError
cause = String -> ProtocolError
WrongFramingError String
reason
forall a. Typeable a => a -> EventStore ()
publish (forall e. Exception e => Connection -> e -> ConnectionError
connectionError Connection
self ProtocolError
cause)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ProtocolError
cause
Right Int
prefix -> do
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> Int -> IO ByteString
Network.connectionGetExact Connection
conn Int
prefix) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> do
forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
Right ByteString
payload ->
case forall a. Get a -> ByteString -> Either String a
runGet Get Package
getPackage ByteString
payload of
Left String
reason -> do
let cause :: ProtocolError
cause = String -> ProtocolError
PackageParsingError String
reason
forall a. Typeable a => a -> EventStore ()
publish (forall e. Exception e => Connection -> e -> ConnectionError
connectionError Connection
self ProtocolError
cause)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ProtocolError
cause
Right Package
pkg -> forall (m :: * -> *) a. Monad m => a -> m a
return Package
pkg
receiving :: ConnectionState
-> Connection
-> Async Network.Connection
-> EventStore ()
receiving :: ConnectionState -> Connection -> Async Connection -> EventStore ()
receiving ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} Connection
self Async Connection
tcpConnAsync =
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> EventStore ()
go forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async Connection
tcpConnAsync
where
go :: Connection -> EventStore ()
go Connection
conn =
forall a. Typeable a => a -> EventStore ()
publish forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Package -> PackageArrived
PackageArrived Connection
self forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Connection -> Connection -> EventStore Package
receivePackage Connection
self Connection
conn
enqueue :: ConnectionState -> Package -> EventStore ()
enqueue :: ConnectionState -> Package -> EventStore ()
enqueue ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} pkg :: Package
pkg@Package{Maybe Credentials
ByteString
ConnectionId
Command
packageCred :: Package -> Maybe Credentials
packageData :: Package -> ByteString
packageCorrelation :: Package -> ConnectionId
packageCmd :: Package -> Command
packageCred :: Maybe Credentials
packageData :: ByteString
packageCorrelation :: ConnectionId
packageCmd :: Command
..} = do
$(logDebug) [i|Package enqueued: #{pkg}|]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue Package
_sendQueue Package
pkg
sending :: ConnectionState
-> Connection
-> Async Network.Connection
-> EventStore ()
sending :: ConnectionState -> Connection -> Async Connection -> EventStore ()
sending ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} Connection
self Async Connection
tcpConnAsync = Connection -> EventStore ()
go forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async Connection
tcpConnAsync
where
go :: Connection -> EventStore ()
go Connection
conn =
let loop :: EventStore ()
loop = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Package -> EventStore ()
send forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue Package
_sendQueue)
send :: Package -> EventStore ()
send Package
pkg =
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> ByteString -> IO ()
Network.connectionPut Connection
conn ByteString
bytes) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
Right ()
_ -> do
Int -> EventStore ()
monitorAddDataTransmitted (forall mono. MonoFoldable mono => mono -> Int
length ByteString
bytes)
EventStore ()
loop
where
bytes :: ByteString
bytes = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Package -> Put
putPackage Package
pkg in
EventStore ()
loop
putPackage :: Package -> Put
putPackage :: Package -> Put
putPackage Package
pkg = do
Putter Word32
putWord32le Word32
length_prefix
Putter Word8
putWord8 (Command -> Word8
cmdWord8 forall a b. (a -> b) -> a -> b
$ Package -> Command
packageCmd Package
pkg)
Putter Word8
putWord8 Word8
flag_word8
Putter ByteString
putLazyByteString ByteString
corr_bytes
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe Credentials
cred_m forall a b. (a -> b) -> a -> b
$ \(Credentials ByteString
login ByteString
passw) -> do
Putter Word8
putWord8 forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ forall mono. MonoFoldable mono => mono -> Int
length ByteString
login
Putter ByteString
putByteString ByteString
login
Putter Word8
putWord8 forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ forall mono. MonoFoldable mono => mono -> Int
length ByteString
passw
Putter ByteString
putByteString ByteString
passw
Putter ByteString
putByteString ByteString
pack_data
where
pack_data :: ByteString
pack_data = Package -> ByteString
packageData Package
pkg
cred_len :: Int
cred_len = forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 Credentials -> Int
credSize Maybe Credentials
cred_m
length_prefix :: Word32
length_prefix = forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall mono. MonoFoldable mono => mono -> Int
length ByteString
pack_data forall a. Num a => a -> a -> a
+ Int
mandatorySize forall a. Num a => a -> a -> a
+ Int
cred_len)
cred_m :: Maybe Credentials
cred_m = Package -> Maybe Credentials
packageCred Package
pkg
flag_word8 :: Word8
flag_word8 = forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word8
0x00 (forall a b. a -> b -> a
const Word8
0x01) Maybe Credentials
cred_m
corr_bytes :: ByteString
corr_bytes = ConnectionId -> ByteString
toByteString forall a b. (a -> b) -> a -> b
$ Package -> ConnectionId
packageCorrelation Package
pkg
credSize :: Credentials -> Int
credSize :: Credentials -> Int
credSize (Credentials ByteString
login ByteString
passw) = forall mono. MonoFoldable mono => mono -> Int
length ByteString
login forall a. Num a => a -> a -> a
+ forall mono. MonoFoldable mono => mono -> Int
length ByteString
passw forall a. Num a => a -> a -> a
+ Int
2
mandatorySize :: Int
mandatorySize :: Int
mandatorySize = Int
18
getLengthPrefix :: Get Int
getLengthPrefix :: Get Int
getLengthPrefix = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (Integral a, Num b) => a -> b
fromIntegral Get Word32
getWord32le
getPackage :: Get Package
getPackage :: Get Package
getPackage = do
Word8
cmd <- Get Word8
getWord8
Flag
flg <- Get Flag
getFlag
ConnectionId
col <- Get ConnectionId
getUUID
Maybe Credentials
cred <- Flag -> Get (Maybe Credentials)
getCredentials Flag
flg
Int
rest <- Get Int
remaining
ByteString
dta <- Int -> Get ByteString
getBytes Int
rest
let pkg :: Package
pkg = Package
{ packageCmd :: Command
packageCmd = Word8 -> Command
getCommand Word8
cmd
, packageCorrelation :: ConnectionId
packageCorrelation = ConnectionId
col
, packageData :: ByteString
packageData = ByteString
dta
, packageCred :: Maybe Credentials
packageCred = Maybe Credentials
cred
}
forall (m :: * -> *) a. Monad m => a -> m a
return Package
pkg
getFlag :: Get Flag
getFlag :: Get Flag
getFlag = do
Word8
wd <- Get Word8
getWord8
case Word8
wd of
Word8
0x00 -> forall (m :: * -> *) a. Monad m => a -> m a
return Flag
None
Word8
0x01 -> forall (m :: * -> *) a. Monad m => a -> m a
return Flag
Authenticated
Word8
_ -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail forall a b. (a -> b) -> a -> b
$ forall r. PrintfType r => String -> r
printf String
"TCP: Unhandled flag value 0x%x" Word8
wd
getCredEntryLength :: Get Int
getCredEntryLength :: Get Int
getCredEntryLength = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (Integral a, Num b) => a -> b
fromIntegral Get Word8
getWord8
getCredentials :: Flag -> Get (Maybe Credentials)
getCredentials :: Flag -> Get (Maybe Credentials)
getCredentials Flag
None = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
getCredentials Flag
_ = do
Int
loginLen <- Get Int
getCredEntryLength
ByteString
login <- Int -> Get ByteString
getBytes Int
loginLen
Int
passwLen <- Get Int
getCredEntryLength
ByteString
passw <- Int -> Get ByteString
getBytes Int
passwLen
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> Credentials
credentials ByteString
login ByteString
passw
getUUID :: Get UUID
getUUID :: Get ConnectionId
getUUID = do
ByteString
bs <- Int64 -> Get ByteString
getLazyByteString Int64
16
case ByteString -> Maybe ConnectionId
fromByteString ByteString
bs of
Just ConnectionId
uuid -> forall (m :: * -> *) a. Monad m => a -> m a
return ConnectionId
uuid
Maybe ConnectionId
_ -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TCP: Wrong UUID format"