{-# LANGUAGE RecordWildCards #-}
module Database.EventStore.Internal.Subscription.Persistent where
import Data.UUID
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Persist
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
data Phase
= Pending
| Running SubDetails
| Closed (Either SomeException SubDropReason)
data PersistentSubscription =
PersistentSubscription { _perExec :: Exec
, _perStream :: StreamName
, _perCred :: Maybe Credentials
, _perPhase :: TVar Phase
, _perNext :: STM (Maybe ResolvedEvent)
}
instance Subscription PersistentSubscription where
nextEventMaybeSTM = _perNext
getSubscriptionDetailsSTM s = do
p <- readTVar (_perPhase s)
case p of
Pending -> retrySTM
Running details -> return details
Closed outcome ->
case outcome of
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
subscriptionStream = _perStream
unsubscribe s = subUnsubscribe (_perExec s) s
newPersistentSubscription :: Exec
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
newPersistentSubscription exec grp stream bufSize cred = do
phaseVar <- newTVarIO Pending
queue <- newTQueueIO
let name = streamNameRaw stream
sub = PersistentSubscription exec stream cred phaseVar $ do
p <- readTVar phaseVar
isEmpty <- isEmptyTQueue queue
if isEmpty
then
case p of
Closed outcome ->
case outcome of
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
_ -> return Nothing
else Just <$> readTQueue queue
callback (Left e) = atomically $
writeTVar phaseVar (Closed $ Left e)
callback (Right action) =
case action of
Confirmed details -> atomically $
writeTVar phaseVar (Running details)
Dropped r -> atomically $
writeTVar phaseVar (Closed $ Right r)
Submit e -> atomically $ do
readTVar phaseVar >>= \case
Running{} -> writeTQueue queue e
_ -> return ()
ConnectionReset -> atomically $
writeTVar phaseVar (Closed $ Right SubAborted)
cb <- newCallback callback
publishWith exec (SubmitOperation cb (persist grp name bufSize cred))
return sub
notifyEventsProcessed :: PersistentSubscription -> [UUID] -> IO ()
notifyEventsProcessed PersistentSubscription{..} evts = do
details <- atomically $ do
p <- readTVar _perPhase
case p of
Closed outcome ->
case outcome of
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
Pending -> retrySTM
Running d -> return d
let uuid = subId details
Just sid = subSubId details
pkg = createAckPackage _perCred uuid sid evts
publishWith _perExec (SendPackage pkg)
acknowledge :: PersistentSubscription -> ResolvedEvent -> IO ()
acknowledge sub e = notifyEventsProcessed sub [resolvedEventOriginalId e]
acknowledgeEvents :: PersistentSubscription -> [ResolvedEvent] -> IO ()
acknowledgeEvents sub = notifyEventsProcessed sub . fmap resolvedEventOriginalId
failed :: PersistentSubscription
-> ResolvedEvent
-> NakAction
-> Maybe Text
-> IO ()
failed sub e a r = notifyEventsFailed sub a r [resolvedEventOriginalId e]
eventsFailed :: PersistentSubscription
-> [ResolvedEvent]
-> NakAction
-> Maybe Text
-> IO ()
eventsFailed sub evts a r =
notifyEventsFailed sub a r $ fmap resolvedEventOriginalId evts
notifyEventsFailed :: PersistentSubscription
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
notifyEventsFailed PersistentSubscription{..} act res evts = do
details <- atomically $ do
p <- readTVar _perPhase
case p of
Closed outcome ->
case outcome of
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
Pending -> retrySTM
Running d -> return d
let uuid = subId details
Just sid = subSubId details
pkg = createNakPackage _perCred uuid sid act res evts
publishWith _perExec (SendPackage pkg)