{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore
(
Connection
, ConnectionType(..)
, Credentials
, Settings(..)
, Retry
, atMost
, keepRetrying
, credentials
, defaultSettings
, defaultSSLSettings
, connect
, shutdown
, waitTillClosed
, connectionSettings
, ClusterSettings(..)
, DnsServer(..)
, GossipSeed
, gossipSeed
, gossipSeedWithHeader
, gossipSeedHost
, gossipSeedHeader
, gossipSeedPort
, gossipSeedClusterSettings
, dnsClusterSettings
, Event
, EventData
, EventType(..)
, createEvent
, withJson
, withJsonAndMetadata
, withBinary
, withBinaryAndMetadata
, OperationMaxAttemptReached(..)
, StreamMetadataResult(..)
, readEvent
, readAllEventsBackward
, readAllEventsForward
, readStreamEventsBackward
, readStreamEventsForward
, getStreamMetadata
, StreamACL(..)
, StreamMetadata(..)
, getCustomPropertyValue
, getCustomProperty
, emptyStreamACL
, emptyStreamMetadata
, deleteStream
, sendEvent
, sendEvents
, setStreamMetadata
, Builder
, StreamACLBuilder
, buildStreamACL
, modifyStreamACL
, setReadRoles
, setReadRole
, setWriteRoles
, setWriteRole
, setDeleteRoles
, setDeleteRole
, setMetaReadRoles
, setMetaReadRole
, setMetaWriteRoles
, setMetaWriteRole
, StreamMetadataBuilder
, buildStreamMetadata
, modifyStreamMetadata
, setMaxCount
, setMaxAge
, setTruncateBefore
, setCacheControl
, setACL
, modifyACL
, setCustomProperty
, Transaction
, TransactionId
, startTransaction
, transactionId
, transactionCommit
, transactionRollback
, transactionWrite
, SubscriptionClosed(..)
, SubscriptionId
, Subscription
, SubDropReason(..)
, SubDetails
, waitConfirmation
, unsubscribeConfirmed
, unsubscribeConfirmedSTM
, waitUnsubscribeConfirmed
, nextEventMaybeSTM
, getSubscriptionDetailsSTM
, unsubscribe
, subscriptionStream
, RegularSubscription
, subscribe
, subscribeToAll
, getSubscriptionId
, isSubscribedToAll
, nextEvent
, nextEventMaybe
, CatchupSubscription
, subscribeFrom
, subscribeToAllFrom
, waitTillCatchup
, hasCaughtUp
, hasCaughtUpSTM
, PersistentSubscription
, PersistentSubscriptionSettings(..)
, SystemConsumerStrategy(..)
, NakAction(..)
, PersistActionException(..)
, acknowledge
, acknowledgeEvents
, failed
, eventsFailed
, notifyEventsProcessed
, notifyEventsFailed
, defaultPersistentSubscriptionSettings
, createPersistentSubscription
, updatePersistentSubscription
, deletePersistentSubscription
, connectToPersistentSubscription
, Slice(..)
, AllSlice
, Op.DeleteResult(..)
, WriteResult(..)
, ReadResult(..)
, RecordedEvent(..)
, Op.ReadEvent(..)
, StreamType(..)
, StreamSlice
, Position(..)
, ReadDirection(..)
, ResolvedEvent(..)
, OperationError(..)
, StreamName(..)
, isEventResolvedLink
, resolvedEventOriginal
, resolvedEventDataAsJson
, resolvedEventOriginalStreamId
, resolvedEventOriginalId
, resolvedEventOriginalEventNumber
, recordedEventDataAsJson
, positionStart
, positionEnd
, LogLevel(..)
, LogType(..)
, LoggerFilter(..)
, Command
, DropReason(..)
, ExpectedVersion
, anyVersion
, noStreamVersion
, emptyStreamVersion
, exactEventVersion
, streamExists
, msDiffTime
, (<>)
, NonEmpty(..)
, nonEmpty
, TLSSettings
, NominalDiffTime
) where
import Prelude (String)
import Data.Int
import Data.Maybe
import Data.Time (NominalDiffTime)
import Data.List.NonEmpty(NonEmpty(..), nonEmpty)
import Network.Connection (TLSSettings)
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Connection (connectionBuilder)
import Database.EventStore.Internal.Control hiding (subscribe)
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Catchup
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Persistent
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Regular
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Manager.Operation.Registry
import Database.EventStore.Internal.Operation (OperationError(..))
import qualified Database.EventStore.Internal.Operations as Op
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types
data ConnectionType
= Static String Int
| Cluster ClusterSettings
| Dns ByteString (Maybe DnsServer) Int
data Connection
= Connection
{ _exec :: Exec
, _settings :: Settings
, _type :: ConnectionType
}
connect :: Settings -> ConnectionType -> IO Connection
connect settings@Settings{..} tpe = do
disc <- case tpe of
Static host port -> return $ staticEndPointDiscovery host port
Cluster setts -> clusterDnsEndPointDiscovery setts
Dns dom srv port -> return $ simpleDnsEndPointDiscovery dom srv port
logRef <- newLoggerRef s_loggerType s_loggerFilter s_loggerDetailed
mainBus <- newBus logRef settings
builder <- connectionBuilder settings
exec <- newExec settings mainBus builder disc
return $ Connection exec settings tpe
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{..} = execWaitTillClosed _exec
connectionSettings :: Connection -> Settings
connectionSettings = _settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = publishWith _exec SystemShutdown
sendEvent :: Connection
-> StreamName
-> ExpectedVersion
-> Event
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt cred =
sendEvents mgr evt_stream exp_ver [evt] cred
sendEvents :: Connection
-> StreamName
-> ExpectedVersion
-> [Event]
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts cred = do
p <- newPromise
let op = Op.writeEvents _settings (streamNameRaw evt_stream) exp_ver cred evts
publishWith _exec (SubmitOperation p op)
async (retrieve p)
deleteStream :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Bool
-> Maybe Credentials
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del cred = do
p <- newPromise
let op = Op.deleteStream _settings (streamNameRaw evt_stream) exp_ver hard_del cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
data Transaction =
Transaction
{ _tStream :: Text
, _tTransId :: TransactionId
, _tExpVer :: ExpectedVersion
, _tConn :: Connection
}
newtype TransactionId =
TransactionId { _unTransId :: Int64 }
deriving (Eq, Ord, Show)
transactionId :: Transaction -> TransactionId
transactionId = _tTransId
startTransaction :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Credentials
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver cred = do
p <- newPromise
let op = Op.transactionStart _settings (streamNameRaw evt_stream) exp_ver cred
publishWith _exec (SubmitOperation p op)
async $ do
tid <- retrieve p
return Transaction
{ _tStream = streamNameRaw evt_stream
, _tTransId = TransactionId tid
, _tExpVer = exp_ver
, _tConn = conn
}
transactionWrite :: Transaction
-> [Event]
-> Maybe Credentials
-> IO (Async ())
transactionWrite Transaction{..} evts cred = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionWrite _settings _tStream _tExpVer raw_id evts cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionCommit :: Transaction -> Maybe Credentials -> IO (Async WriteResult)
transactionCommit Transaction{..} cred = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionCommit _settings _tStream _tExpVer raw_id cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionRollback :: Transaction -> IO ()
transactionRollback _ = return ()
readEvent :: Connection
-> StreamName
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async (ReadResult 'RegularStream Op.ReadEvent))
readEvent Connection{..} stream_id evt_num res_link_tos cred = do
p <- newPromise
let op = Op.readEvent _settings (streamNameRaw stream_id) evt_num res_link_tos cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
readStreamEventsForward :: Connection
-> StreamName
-> Int64
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsForward mgr =
readStreamEventsCommon mgr Forward
readStreamEventsBackward :: Connection
-> StreamName
-> Int64
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsBackward mgr =
readStreamEventsCommon mgr Backward
readStreamEventsCommon :: Connection
-> ReadDirection
-> StreamName
-> Int64
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos cred = do
p <- newPromise
let name = streamNameRaw stream_id
op = Op.readStreamEvents _settings dir name start cnt res_link_tos cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
readAllEventsForward :: Connection
-> Position
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async AllSlice)
readAllEventsForward mgr =
readAllEventsCommon mgr Forward
readAllEventsBackward :: Connection
-> Position
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async AllSlice)
readAllEventsBackward mgr =
readAllEventsCommon mgr Backward
readAllEventsCommon :: Connection
-> ReadDirection
-> Position
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async AllSlice)
readAllEventsCommon Connection{..} dir pos max_c res_link_tos cred = do
p <- newPromise
let op = Op.readAllEvents _settings c_pos p_pos max_c res_link_tos dir cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
where
Position c_pos p_pos = pos
subscribe :: Connection
-> StreamName
-> Bool
-> Maybe Credentials
-> IO RegularSubscription
subscribe Connection{..} stream resLnkTos cred =
newRegularSubscription _exec stream resLnkTos cred
subscribeToAll :: Connection
-> Bool
-> Maybe Credentials
-> IO RegularSubscription
subscribeToAll conn resLnkTos cred = subscribe conn AllStream resLnkTos cred
subscribeFrom :: Connection
-> StreamName
-> Bool
-> Maybe Int64
-> Maybe Int32
-> Maybe Credentials
-> IO CatchupSubscription
subscribeFrom conn streamId resLnkTos lastChkPt batch cred =
subscribeFromCommon conn resLnkTos batch cred tpe
where
tpe = Op.RegularCatchup (streamNameRaw streamId) (fromMaybe 0 lastChkPt)
subscribeToAllFrom :: Connection
-> Bool
-> Maybe Position
-> Maybe Int32
-> Maybe Credentials
-> IO CatchupSubscription
subscribeToAllFrom conn resLnkTos lastChkPt batch cred =
subscribeFromCommon conn resLnkTos batch cred tpe
where
Position cPos pPos = fromMaybe positionStart lastChkPt
tpe = Op.AllCatchup (Position cPos pPos)
subscribeFromCommon :: Connection
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> Op.CatchupState
-> IO CatchupSubscription
subscribeFromCommon Connection{..} resLnkTos batch cred tpe =
newCatchupSubscription _exec resLnkTos batch cred tpe
setStreamMetadata :: Connection
-> StreamName
-> ExpectedVersion
-> StreamMetadata
-> Maybe Credentials
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata cred = do
p <- newPromise
let name = streamNameRaw evt_stream
op = Op.setMetaStream _settings name exp_ver cred metadata
publishWith _exec (SubmitOperation p op)
async (retrieve p)
getStreamMetadata :: Connection
-> StreamName
-> Maybe Credentials
-> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} evt_stream cred = do
p <- newPromise
let op = Op.readMetaStream _settings (streamNameRaw evt_stream) cred
publishWith _exec (SubmitOperation p op)
async (retrieve p)
createPersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
let op = Op.createPersist grp (streamNameRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
updatePersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{..} grp stream sett cred = do
p <- newPromise
let op = Op.updatePersist grp (streamNameRaw stream) sett cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
deletePersistentSubscription :: Connection
-> Text
-> StreamName
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{..} grp stream cred = do
p <- newPromise
let op = Op.deletePersist grp (streamNameRaw stream) cred
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
connectToPersistentSubscription :: Connection
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
connectToPersistentSubscription Connection{..} group stream bufSize cred =
newPersistentSubscription _exec group stream bufSize cred
persistAsync :: Callback (Maybe PersistActionException)
-> IO (Maybe PersistActionException)
persistAsync = either throw return <=< tryRetrieve