{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal where
import Prelude (String)
import Data.Int
import Data.Maybe
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Connection (connectionBuilder)
import Database.EventStore.Internal.Control hiding (subscribe)
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Subscription.Catchup
import Database.EventStore.Internal.Subscription.Persistent
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Regular
import Database.EventStore.Internal.Logger
import qualified Database.EventStore.Internal.Operations as Op
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types
data ConnectionType
= Static String Int
| Cluster ClusterSettings
| Dns ByteString (Maybe DnsServer) Int
data Connection
= Connection
{ _exec :: Exec
, _settings :: Settings
, _type :: ConnectionType
}
connect :: Settings -> ConnectionType -> IO Connection
connect settings@Settings{..} tpe = do
disc <- case tpe of
Static host port -> return $ staticEndPointDiscovery host port
Cluster setts -> clusterDnsEndPointDiscovery setts
Dns dom srv port -> return $ simpleDnsEndPointDiscovery dom srv port
logRef <- newLoggerRef s_loggerType s_loggerFilter s_loggerDetailed
mainBus <- newBus logRef settings
builder <- connectionBuilder settings
exec <- newExec settings mainBus builder disc
return $ Connection exec settings tpe
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{..} = execWaitTillClosed _exec
connectionSettings :: Connection -> Settings
connectionSettings = _settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = publishWith _exec SystemShutdown
sendEvent :: Connection
-> StreamName
-> ExpectedVersion
-> Event
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt cred =
sendEvents mgr evt_stream exp_ver [evt] cred
sendEvents :: Connection
-> StreamName
-> ExpectedVersion
-> [Event]
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts cred =
Op.writeEvents _settings _exec (streamIdRaw evt_stream) exp_ver cred evts
deleteStream :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Bool
-> Maybe Credentials
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} stream expVer hardDel cred =
Op.deleteStream _settings _exec (streamIdRaw stream) expVer hardDel cred
data Transaction =
Transaction
{ _tStream :: Text
, _tTransId :: TransactionId
, _tExpVer :: ExpectedVersion
, _tConn :: Connection
}
newtype TransactionId =
TransactionId { _unTransId :: Int64 }
deriving (Eq, Ord, Show)
transactionId :: Transaction -> TransactionId
transactionId = _tTransId
startTransaction :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Credentials
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver cred = do
as <- Op.transactionStart _settings _exec (streamIdRaw evt_stream) exp_ver cred
async $ do
tid <- wait as
return Transaction
{ _tStream = streamIdRaw evt_stream
, _tTransId = TransactionId tid
, _tExpVer = exp_ver
, _tConn = conn
}
transactionWrite :: Transaction
-> [Event]
-> Maybe Credentials
-> IO (Async ())
transactionWrite Transaction{..} evts cred = do
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
Op.transactionWrite _settings _exec _tStream _tExpVer raw_id evts cred
transactionCommit :: Transaction -> Maybe Credentials -> IO (Async WriteResult)
transactionCommit Transaction{..} cred = do
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
Op.transactionCommit _settings _exec _tStream _tExpVer raw_id cred
transactionRollback :: Transaction -> IO ()
transactionRollback _ = return ()
readEvent :: Connection
-> StreamName
-> EventNumber
-> ResolveLink
-> Maybe Credentials
-> IO (Async (ReadResult EventNumber Op.ReadEvent))
readEvent Connection{..} stream evtNum resLinkTos cred = do
let evtNumRaw = eventNumberToInt64 evtNum
linkTos = resolveLinkToBool resLinkTos
Op.readEvent _settings _exec (streamIdRaw stream) evtNumRaw linkTos cred
type family BatchResult t where
BatchResult EventNumber = ReadResult EventNumber StreamSlice
BatchResult Position = AllSlice
readEventsForward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsForward conn = readEventsCommon conn Forward
readEventsBackward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsBackward conn = readEventsCommon conn Backward
readEventsCommon :: Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsCommon Connection{..} dir streamId start cnt resLinkTos cred = do
let res_link_tos = resolveLinkToBool resLinkTos
case streamId of
StreamName{} ->
let name = streamIdRaw streamId
evtNum = eventNumberToInt64 start in
Op.readStreamEvents _settings _exec dir name evtNum cnt res_link_tos cred
All ->
let Position c_pos p_pos = start in
Op.readAllEvents _settings _exec c_pos p_pos cnt res_link_tos dir cred
subscribe :: Connection
-> StreamId t
-> ResolveLink
-> Maybe Credentials
-> IO (RegularSubscription t)
subscribe Connection{..} stream resLinkTos cred =
newRegularSubscription _exec stream resLnkTos cred
where
resLnkTos = resolveLinkToBool resLinkTos
subscribeFrom :: Connection
-> StreamId t
-> ResolveLink
-> Maybe t
-> Maybe Int32
-> Maybe Credentials
-> IO (CatchupSubscription t)
subscribeFrom Connection{..} streamId resLinkTos lastChkPt batch cred =
newCatchupSubscription _exec resLnkTos batch cred streamId $
case streamId of
StreamName{} -> fromMaybe streamStart lastChkPt
All -> fromMaybe positionStart lastChkPt
where
resLnkTos = resolveLinkToBool resLinkTos
subscribeFromCommon :: Connection
-> ResolveLink
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
subscribeFromCommon Connection{..} resLinkTos batch cred kind seed =
newCatchupSubscription _exec resLnkTos batch cred kind seed
where
resLnkTos = resolveLinkToBool resLinkTos
setStreamMetadata :: Connection
-> StreamName
-> ExpectedVersion
-> StreamMetadata
-> Maybe Credentials
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata cred = do
let name = streamIdRaw evt_stream
Op.setMetaStream _settings _exec name exp_ver cred metadata
getStreamMetadata :: Connection
-> StreamName
-> Maybe Credentials
-> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} stream cred =
Op.readMetaStream _settings _exec (streamIdRaw stream) cred
createPersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{..} grp stream sett cred =
Op.createPersist _exec grp (streamIdRaw stream) sett cred
updatePersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{..} grp stream sett cred =
Op.updatePersist _exec grp (streamIdRaw stream) sett cred
deletePersistentSubscription :: Connection
-> Text
-> StreamName
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{..} grp stream cred =
Op.deletePersist _exec grp (streamIdRaw stream) cred
connectToPersistentSubscription :: Connection
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
connectToPersistentSubscription Connection{..} group stream bufSize cred =
newPersistentSubscription _exec group stream bufSize cred