module Database.EventStore.Internal.Subscription.Regular where
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Volatile
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
data Phase
= Pending
| Running SubDetails
| Closed (Either SomeException SubDropReason)
data RegularSubscription =
RegularSubscription { _regExec :: Exec
, _regStream :: StreamName
, _regPhase :: TVar Phase
, _regNext :: STM (Maybe ResolvedEvent)
}
instance Subscription RegularSubscription where
nextEventMaybeSTM = _regNext
getSubscriptionDetailsSTM s = do
p <- readTVar (_regPhase s)
case p of
Pending -> retrySTM
Running details -> return details
Closed outcome ->
case outcome of
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
subscriptionStream = _regStream
unsubscribe s = subUnsubscribe (_regExec s) s
newRegularSubscription :: Exec
-> StreamName
-> Bool
-> Maybe Credentials
-> IO RegularSubscription
newRegularSubscription exec stream tos cred = do
phaseVar <- newTVarIO Pending
queue <- newTQueueIO
let name = streamNameRaw stream
sub = RegularSubscription exec stream phaseVar $ do
p <- readTVar phaseVar
isEmpty <- isEmptyTQueue queue
if isEmpty
then
case p of
Closed outcome ->
case outcome of
Right r -> throwSTM (SubscriptionClosed $ Just r)
Left e -> throwSTM e
_ -> return Nothing
else Just <$> readTQueue queue
callback (Left e) = atomically $
writeTVar phaseVar (Closed $ Left e)
callback (Right action) =
case action of
Confirmed details -> atomically $
writeTVar phaseVar (Running details)
Dropped r -> atomically $
writeTVar phaseVar (Closed $ Right r)
Submit e -> atomically $ do
readTVar phaseVar >>= \case
Running{} -> writeTQueue queue e
_ -> return ()
ConnectionReset -> atomically $
writeTVar phaseVar (Closed $ Right SubAborted)
cb <- newCallback callback
publishWith exec (SubmitOperation cb (volatile name tos cred))
return sub