{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RecordWildCards #-}
{-# OPTIONS -Wno-orphans #-}
module Database.EventStore.Internal.Subscription.Persistent where
import Data.UUID
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Persist
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
data PersistentSubscription =
PersistentSubscription
{ PersistentSubscription -> Exec
_perExec :: Exec
, PersistentSubscription -> UUID
_perSubId :: UUID
, PersistentSubscription -> StreamName
_perStream :: StreamName
, PersistentSubscription -> Maybe Credentials
_perCred :: Maybe Credentials
, PersistentSubscription -> TVar (Maybe Text)
_perSubKey :: TVar (Maybe Text)
, PersistentSubscription -> Chan SubAction
_perChan :: Chan SubAction
}
instance Subscription PersistentSubscription where
nextSubEvent :: PersistentSubscription -> IO SubAction
nextSubEvent PersistentSubscription
s = forall (m :: * -> *) a. MonadBase IO m => Chan a -> m a
readChan (PersistentSubscription -> Chan SubAction
_perChan PersistentSubscription
s)
unsubscribe :: PersistentSubscription -> IO ()
unsubscribe PersistentSubscription
s = forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (PersistentSubscription -> Exec
_perExec PersistentSubscription
s) (Package -> SendPackage
SendPackage Package
pkg)
where
pkg :: Package
pkg = UUID -> Package
createUnsubscribePackage (PersistentSubscription -> UUID
_perSubId PersistentSubscription
s)
instance SubscriptionStream PersistentSubscription EventNumber where
subscriptionStream :: PersistentSubscription -> StreamName
subscriptionStream = PersistentSubscription -> StreamName
_perStream
newPersistentSubscription
:: Exec
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
newPersistentSubscription :: Exec
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
newPersistentSubscription Exec
exec Text
grp (StreamName Text
stream) Int32
bufSize Maybe Credentials
cred
= do (UUID
subId, TVar (Maybe Text)
varSubKey, Chan SubAction
chan) <- Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist Exec
exec Text
grp Text
stream Int32
bufSize Maybe Credentials
cred
let sub :: PersistentSubscription
sub =
PersistentSubscription
{ _perExec :: Exec
_perExec = Exec
exec
, _perSubId :: UUID
_perSubId = UUID
subId
, _perCred :: Maybe Credentials
_perCred = Maybe Credentials
cred
, _perSubKey :: TVar (Maybe Text)
_perSubKey = TVar (Maybe Text)
varSubKey
, _perChan :: Chan SubAction
_perChan = Chan SubAction
chan
, _perStream :: StreamName
_perStream = Text -> StreamName
StreamName Text
stream
}
forall (f :: * -> *) a. Applicative f => a -> f a
pure PersistentSubscription
sub
persistentGetSubKey
:: PersistentSubscription
-> IO Text
persistentGetSubKey :: PersistentSubscription -> IO Text
persistentGetSubKey PersistentSubscription
sub
= forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$
do Maybe Text
subKeyMay <- forall a. TVar a -> STM a
readTVar (PersistentSubscription -> TVar (Maybe Text)
_perSubKey PersistentSubscription
sub)
case Maybe Text
subKeyMay of
Just Text
key
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure Text
key
Maybe Text
Nothing
-> forall a. STM a
retrySTM
notifyEventsProcessed
:: PersistentSubscription
-> [UUID]
-> IO ()
notifyEventsProcessed :: PersistentSubscription -> [UUID] -> IO ()
notifyEventsProcessed PersistentSubscription
sub [UUID]
evts
= do Text
subKey <- PersistentSubscription -> IO Text
persistentGetSubKey PersistentSubscription
sub
let uuid :: UUID
uuid = PersistentSubscription -> UUID
_perSubId PersistentSubscription
sub
pkg :: Package
pkg = Maybe Credentials -> UUID -> Text -> [UUID] -> Package
createAckPackage (PersistentSubscription -> Maybe Credentials
_perCred PersistentSubscription
sub) UUID
uuid Text
subKey [UUID]
evts
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (PersistentSubscription -> Exec
_perExec PersistentSubscription
sub) (Package -> SendPackage
SendPackage Package
pkg)
acknowledge :: PersistentSubscription -> ResolvedEvent -> IO ()
acknowledge :: PersistentSubscription -> ResolvedEvent -> IO ()
acknowledge PersistentSubscription
sub ResolvedEvent
e = PersistentSubscription -> [UUID] -> IO ()
notifyEventsProcessed PersistentSubscription
sub [ResolvedEvent -> UUID
resolvedEventOriginalId ResolvedEvent
e]
acknowledgeEvents :: PersistentSubscription -> [ResolvedEvent] -> IO ()
acknowledgeEvents :: PersistentSubscription -> [ResolvedEvent] -> IO ()
acknowledgeEvents PersistentSubscription
sub = PersistentSubscription -> [UUID] -> IO ()
notifyEventsProcessed PersistentSubscription
sub forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEvent -> UUID
resolvedEventOriginalId
failed :: PersistentSubscription
-> ResolvedEvent
-> NakAction
-> Maybe Text
-> IO ()
failed :: PersistentSubscription
-> ResolvedEvent -> NakAction -> Maybe Text -> IO ()
failed PersistentSubscription
sub ResolvedEvent
e NakAction
a Maybe Text
r = PersistentSubscription
-> NakAction -> Maybe Text -> [UUID] -> IO ()
notifyEventsFailed PersistentSubscription
sub NakAction
a Maybe Text
r [ResolvedEvent -> UUID
resolvedEventOriginalId ResolvedEvent
e]
eventsFailed :: PersistentSubscription
-> [ResolvedEvent]
-> NakAction
-> Maybe Text
-> IO ()
eventsFailed :: PersistentSubscription
-> [ResolvedEvent] -> NakAction -> Maybe Text -> IO ()
eventsFailed PersistentSubscription
sub [ResolvedEvent]
evts NakAction
a Maybe Text
r =
PersistentSubscription
-> NakAction -> Maybe Text -> [UUID] -> IO ()
notifyEventsFailed PersistentSubscription
sub NakAction
a Maybe Text
r forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEvent -> UUID
resolvedEventOriginalId [ResolvedEvent]
evts
notifyEventsFailed
:: PersistentSubscription
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
notifyEventsFailed :: PersistentSubscription
-> NakAction -> Maybe Text -> [UUID] -> IO ()
notifyEventsFailed PersistentSubscription
sub NakAction
act Maybe Text
res [UUID]
evts
= do Text
subKey <- PersistentSubscription -> IO Text
persistentGetSubKey PersistentSubscription
sub
let uuid :: UUID
uuid = PersistentSubscription -> UUID
_perSubId PersistentSubscription
sub
pkg :: Package
pkg = Maybe Credentials
-> UUID -> Text -> NakAction -> Maybe Text -> [UUID] -> Package
createNakPackage (PersistentSubscription -> Maybe Credentials
_perCred PersistentSubscription
sub) UUID
uuid Text
subKey NakAction
act Maybe Text
res [UUID]
evts
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (PersistentSubscription -> Exec
_perExec PersistentSubscription
sub) (Package -> SendPackage
SendPackage Package
pkg)