{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal where
import Prelude (String)
import Data.Int
import Data.Maybe
import Data.Time (NominalDiffTime)
import Data.List.NonEmpty(NonEmpty(..), nonEmpty)
import Network.Connection (TLSSettings)
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Connection (connectionBuilder)
import Database.EventStore.Internal.Control hiding (subscribe)
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Catchup
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Persistent
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Regular
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Manager.Operation.Registry
import Database.EventStore.Internal.Operation (OperationError(..))
import qualified Database.EventStore.Internal.Operations as Op
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types
data ConnectionType
= Static String Int
| Cluster ClusterSettings
| Dns ByteString (Maybe DnsServer) Int
data Connection
= Connection
{ _exec :: Exec
, _settings :: Settings
, _type :: ConnectionType
}
connect :: Settings -> ConnectionType -> IO Connection
connect settings@Settings{..} tpe = do
disc <- case tpe of
Static host port -> return $ staticEndPointDiscovery host port
Cluster setts -> clusterDnsEndPointDiscovery setts
Dns dom srv port -> return $ simpleDnsEndPointDiscovery dom srv port
logRef <- newLoggerRef s_loggerType s_loggerFilter s_loggerDetailed
mainBus <- newBus logRef settings
builder <- connectionBuilder settings
exec <- newExec settings mainBus builder disc
return $ Connection exec settings tpe
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{..} = execWaitTillClosed _exec
connectionSettings :: Connection -> Settings
connectionSettings = _settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = publishWith _exec SystemShutdown
sendEvent :: Connection
-> StreamName
-> ExpectedVersion
-> Event
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt cred =
sendEvents mgr evt_stream exp_ver [evt] cred
sendEvents :: Connection
-> StreamName
-> ExpectedVersion
-> [Event]
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts cred = do
p <- newPromise
let op = Op.writeEvents _settings (streamIdRaw evt_stream) exp_ver cred evts
publishWith _exec (SubmitOperation p op)
async (retrieve p)
deleteStream :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Bool
-> Maybe Credentials
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del cred = do
p <- newPromise
let op = Op.deleteStream _settings (streamIdRaw evt_stream) exp_ver hard_del cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
data Transaction =
Transaction
{ _tStream :: Text
, _tTransId :: TransactionId
, _tExpVer :: ExpectedVersion
, _tConn :: Connection
}
newtype TransactionId =
TransactionId { _unTransId :: Int64 }
deriving (Eq, Ord, Show)
transactionId :: Transaction -> TransactionId
transactionId = _tTransId
startTransaction :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Credentials
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver cred = do
p <- newPromise
let op = Op.transactionStart _settings (streamIdRaw evt_stream) exp_ver cred
publishWith _exec (SubmitOperation p op)
async $ do
tid <- retrieve p
return Transaction
{ _tStream = streamIdRaw evt_stream
, _tTransId = TransactionId tid
, _tExpVer = exp_ver
, _tConn = conn
}
transactionWrite :: Transaction
-> [Event]
-> Maybe Credentials
-> IO (Async ())
transactionWrite Transaction{..} evts cred = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionWrite _settings _tStream _tExpVer raw_id evts cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionCommit :: Transaction -> Maybe Credentials -> IO (Async WriteResult)
transactionCommit Transaction{..} cred = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionCommit _settings _tStream _tExpVer raw_id cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionRollback :: Transaction -> IO ()
transactionRollback _ = return ()
readEvent :: Connection
-> StreamName
-> EventNumber
-> ResolveLink
-> Maybe Credentials
-> IO (Async (ReadResult EventNumber Op.ReadEvent))
readEvent Connection{..} stream_id evtNum resLinkTos cred = do
p <- newPromise
let evt_num = eventNumberToInt64 evtNum
res_link_tos = resolveLinkToBool resLinkTos
op = Op.readEvent _settings (streamIdRaw stream_id) evt_num res_link_tos cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
type family BatchResult t where
BatchResult EventNumber = ReadResult EventNumber StreamSlice
BatchResult Position = AllSlice
readEventsForward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsForward conn = readEventsCommon conn Forward
readEventsBackward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsBackward conn = readEventsCommon conn Backward
readEventsCommon :: Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsCommon Connection{..} dir streamId start cnt resLinkTos cred = do
p <- newPromise
let res_link_tos = resolveLinkToBool resLinkTos
op =
case streamId of
StreamName{} ->
let name = streamIdRaw streamId
evtNum = eventNumberToInt64 start in
Op.readStreamEvents _settings dir name evtNum cnt res_link_tos cred
All ->
let Position c_pos p_pos = start in
Op.readAllEvents _settings c_pos p_pos cnt res_link_tos dir cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
subscribe :: Connection
-> StreamId t
-> ResolveLink
-> Maybe Credentials
-> IO (RegularSubscription t)
subscribe Connection{..} stream resLinkTos cred =
newRegularSubscription _exec stream resLnkTos cred
where
resLnkTos = resolveLinkToBool resLinkTos
subscribeFrom :: Connection
-> StreamId t
-> ResolveLink
-> Maybe t
-> Maybe Int32
-> Maybe Credentials
-> IO (CatchupSubscription t)
subscribeFrom Connection{..} streamId resLinkTos lastChkPt batch cred =
newCatchupSubscription _exec resLnkTos batch cred streamId $
case streamId of
StreamName{} -> fromMaybe streamStart lastChkPt
All -> fromMaybe positionStart lastChkPt
where
resLnkTos = resolveLinkToBool resLinkTos
subscribeFromCommon :: Connection
-> ResolveLink
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
subscribeFromCommon Connection{..} resLinkTos batch cred kind seed =
newCatchupSubscription _exec resLnkTos batch cred kind seed
where
resLnkTos = resolveLinkToBool resLinkTos
setStreamMetadata :: Connection
-> StreamName
-> ExpectedVersion
-> StreamMetadata
-> Maybe Credentials
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata cred = do
p <- newPromise
let name = streamIdRaw evt_stream
op = Op.setMetaStream _settings name exp_ver cred metadata
publishWith _exec (SubmitOperation p op)
async (retrieve p)
getStreamMetadata :: Connection
-> StreamName
-> Maybe Credentials
-> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} evt_stream cred = do
p <- newPromise
let op = Op.readMetaStream _settings (streamIdRaw evt_stream) cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
createPersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
let op = Op.createPersist grp (streamIdRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
updatePersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
let op = Op.updatePersist grp (streamIdRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
deletePersistentSubscription :: Connection
-> Text
-> StreamName
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{..} grp stream cred = do
p <- newPromise
let op = Op.deletePersist grp (streamIdRaw stream) cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
connectToPersistentSubscription :: Connection
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
connectToPersistentSubscription Connection{..} group stream bufSize cred =
newPersistentSubscription _exec group stream bufSize cred
persistAsync :: Callback (Maybe PersistActionException)
-> IO (Maybe PersistActionException)
persistAsync = either throw return <=< tryRetrieve