{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal where
import Prelude (String)
import Data.Int
import Data.Maybe
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Connection (connectionBuilder)
import Database.EventStore.Internal.Control hiding (subscribe)
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Subscription.Catchup
import Database.EventStore.Internal.Subscription.Persistent
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Regular
import Database.EventStore.Internal.Logger
import qualified Database.EventStore.Internal.Operations as Op
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types
data ConnectionType
= Static String Int
| Cluster ClusterSettings
| Dns ByteString (Maybe DnsServer) Int
data Connection
= Connection
{ _exec :: Exec
, _settings :: Settings
, _type :: ConnectionType
}
connect :: Settings -> ConnectionType -> IO Connection
connect settings@Settings{..} tpe = do
disc <- case tpe of
Static host port -> return $ staticEndPointDiscovery host port
Cluster setts -> clusterDnsEndPointDiscovery setts
Dns dom srv port -> return $ simpleDnsEndPointDiscovery dom srv port
logRef <- newLoggerRef s_loggerType s_loggerFilter s_loggerDetailed
mainBus <- newBus logRef settings
builder <- connectionBuilder settings
exec <- newExec settings mainBus builder disc
return $ Connection exec settings tpe
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{..} = execWaitTillClosed _exec
connectionSettings :: Connection -> Settings
connectionSettings = _settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = publishWith _exec SystemShutdown
sendEvent :: Connection
-> StreamName
-> ExpectedVersion
-> Event
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt cred =
sendEvents mgr evt_stream exp_ver [evt] cred
sendEvents :: Connection
-> StreamName
-> ExpectedVersion
-> [Event]
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts cred = do
p <- newPromise
let op = Op.writeEvents _settings (streamIdRaw evt_stream) exp_ver cred evts
publishWith _exec (SubmitOperation p op)
async (retrieve p)
deleteStream :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Bool
-> Maybe Credentials
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del cred = do
p <- newPromise
let op = Op.deleteStream _settings (streamIdRaw evt_stream) exp_ver hard_del cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
data Transaction =
Transaction
{ _tStream :: Text
, _tTransId :: TransactionId
, _tExpVer :: ExpectedVersion
, _tConn :: Connection
}
newtype TransactionId =
TransactionId { _unTransId :: Int64 }
deriving (Eq, Ord, Show)
transactionId :: Transaction -> TransactionId
transactionId = _tTransId
startTransaction :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Credentials
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver cred = do
p <- newPromise
let op = Op.transactionStart _settings (streamIdRaw evt_stream) exp_ver cred
publishWith _exec (SubmitOperation p op)
async $ do
tid <- retrieve p
return Transaction
{ _tStream = streamIdRaw evt_stream
, _tTransId = TransactionId tid
, _tExpVer = exp_ver
, _tConn = conn
}
transactionWrite :: Transaction
-> [Event]
-> Maybe Credentials
-> IO (Async ())
transactionWrite Transaction{..} evts cred = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionWrite _settings _tStream _tExpVer raw_id evts cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionCommit :: Transaction -> Maybe Credentials -> IO (Async WriteResult)
transactionCommit Transaction{..} cred = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionCommit _settings _tStream _tExpVer raw_id cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionRollback :: Transaction -> IO ()
transactionRollback _ = return ()
readEvent :: Connection
-> StreamName
-> EventNumber
-> ResolveLink
-> Maybe Credentials
-> IO (Async (ReadResult EventNumber Op.ReadEvent))
readEvent Connection{..} stream_id evtNum resLinkTos cred = do
p <- newPromise
let evt_num = eventNumberToInt64 evtNum
res_link_tos = resolveLinkToBool resLinkTos
op = Op.readEvent _settings (streamIdRaw stream_id) evt_num res_link_tos cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
type family BatchResult t where
BatchResult EventNumber = ReadResult EventNumber StreamSlice
BatchResult Position = AllSlice
readEventsForward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsForward conn = readEventsCommon conn Forward
readEventsBackward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsBackward conn = readEventsCommon conn Backward
readEventsCommon :: Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsCommon Connection{..} dir streamId start cnt resLinkTos cred = do
p <- newPromise
let res_link_tos = resolveLinkToBool resLinkTos
op =
case streamId of
StreamName{} ->
let name = streamIdRaw streamId
evtNum = eventNumberToInt64 start in
Op.readStreamEvents _settings dir name evtNum cnt res_link_tos cred
All ->
let Position c_pos p_pos = start in
Op.readAllEvents _settings c_pos p_pos cnt res_link_tos dir cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
subscribe :: Connection
-> StreamId t
-> ResolveLink
-> Maybe Credentials
-> IO (RegularSubscription t)
subscribe Connection{..} stream resLinkTos cred =
newRegularSubscription _exec stream resLnkTos cred
where
resLnkTos = resolveLinkToBool resLinkTos
subscribeFrom :: Connection
-> StreamId t
-> ResolveLink
-> Maybe t
-> Maybe Int32
-> Maybe Credentials
-> IO (CatchupSubscription t)
subscribeFrom Connection{..} streamId resLinkTos lastChkPt batch cred =
newCatchupSubscription _exec resLnkTos batch cred streamId $
case streamId of
StreamName{} -> fromMaybe streamStart lastChkPt
All -> fromMaybe positionStart lastChkPt
where
resLnkTos = resolveLinkToBool resLinkTos
subscribeFromCommon :: Connection
-> ResolveLink
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
subscribeFromCommon Connection{..} resLinkTos batch cred kind seed =
newCatchupSubscription _exec resLnkTos batch cred kind seed
where
resLnkTos = resolveLinkToBool resLinkTos
setStreamMetadata :: Connection
-> StreamName
-> ExpectedVersion
-> StreamMetadata
-> Maybe Credentials
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata cred = do
p <- newPromise
let name = streamIdRaw evt_stream
op = Op.setMetaStream _settings name exp_ver cred metadata
publishWith _exec (SubmitOperation p op)
async (retrieve p)
getStreamMetadata :: Connection
-> StreamName
-> Maybe Credentials
-> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} evt_stream cred = do
p <- newPromise
let op = Op.readMetaStream _settings (streamIdRaw evt_stream) cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
createPersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
let op = Op.createPersist grp (streamIdRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
updatePersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
let op = Op.updatePersist grp (streamIdRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
deletePersistentSubscription :: Connection
-> Text
-> StreamName
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{..} grp stream cred = do
p <- newPromise
let op = Op.deletePersist grp (streamIdRaw stream) cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
connectToPersistentSubscription :: Connection
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
connectToPersistentSubscription Connection{..} group stream bufSize cred =
newPersistentSubscription _exec group stream bufSize cred
persistAsync :: Callback (Maybe PersistActionException)
-> IO (Maybe PersistActionException)
persistAsync = either throw return <=< tryRetrieve