{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RecordWildCards #-}
{-# OPTIONS -Wno-orphans #-}
module Database.EventStore.Internal.Subscription.Persistent where
import Data.UUID
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Persist
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
data PersistentSubscription =
PersistentSubscription
{ _perExec :: Exec
, _perSubId :: UUID
, _perStream :: StreamName
, _perCred :: Maybe Credentials
, _perSubKey :: TVar (Maybe Text)
, _perChan :: Chan SubAction
}
instance Subscription PersistentSubscription where
nextSubEvent s = readChan (_perChan s)
unsubscribe s = publishWith (_perExec s) (SendPackage pkg)
where
pkg = createUnsubscribePackage (_perSubId s)
instance SubscriptionStream PersistentSubscription EventNumber where
subscriptionStream = _perStream
newPersistentSubscription
:: Exec
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
newPersistentSubscription exec grp (StreamName stream) bufSize cred
= do (subId, varSubKey, chan) <- persist exec grp stream bufSize cred
let sub =
PersistentSubscription
{ _perExec = exec
, _perSubId = subId
, _perCred = cred
, _perSubKey = varSubKey
, _perChan = chan
, _perStream = StreamName stream
}
pure sub
persistentGetSubKey
:: PersistentSubscription
-> IO Text
persistentGetSubKey sub
= atomically $
do subKeyMay <- readTVar (_perSubKey sub)
case subKeyMay of
Just key
-> pure key
Nothing
-> retrySTM
notifyEventsProcessed
:: PersistentSubscription
-> [UUID]
-> IO ()
notifyEventsProcessed sub evts
= do subKey <- persistentGetSubKey sub
let uuid = _perSubId sub
pkg = createAckPackage (_perCred sub) uuid subKey evts
publishWith (_perExec sub) (SendPackage pkg)
acknowledge :: PersistentSubscription -> ResolvedEvent -> IO ()
acknowledge sub e = notifyEventsProcessed sub [resolvedEventOriginalId e]
acknowledgeEvents :: PersistentSubscription -> [ResolvedEvent] -> IO ()
acknowledgeEvents sub = notifyEventsProcessed sub . fmap resolvedEventOriginalId
failed :: PersistentSubscription
-> ResolvedEvent
-> NakAction
-> Maybe Text
-> IO ()
failed sub e a r = notifyEventsFailed sub a r [resolvedEventOriginalId e]
eventsFailed :: PersistentSubscription
-> [ResolvedEvent]
-> NakAction
-> Maybe Text
-> IO ()
eventsFailed sub evts a r =
notifyEventsFailed sub a r $ fmap resolvedEventOriginalId evts
notifyEventsFailed
:: PersistentSubscription
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
notifyEventsFailed sub act res evts
= do subKey <- persistentGetSubKey sub
let uuid = _perSubId sub
pkg = createNakPackage (_perCred sub) uuid subKey act res evts
publishWith (_perExec sub) (SendPackage pkg)