module Database.EventStore.Internal.Operation.Volatile (volatile) where
import Data.ProtocolBuffers
import Data.UUID
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
volatile
:: Exec
-> StreamId t
-> Bool
-> Maybe Credentials
-> IO (UUID, Chan SubAction)
volatile :: forall t.
Exec
-> StreamId t
-> Bool
-> Maybe Credentials
-> IO (UUID, Chan SubAction)
volatile Exec
exec StreamId t
streamId Bool
tos Maybe Credentials
cred
= do Mailbox
m <- forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
Chan SubAction
subM <- forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
let req :: SubscribeToStream
req = Text -> Bool -> SubscribeToStream
subscribeToStream Text
stream Bool
tos
Package
pkg <- forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
subscribeToStreamCmd Maybe Credentials
cred SubscribeToStream
req
let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
pkg
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
pkg)
Async ()
_ <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => m (Loop a) -> m a
keepLooping forall a b. (a -> b) -> a -> b
$
do Either OperationError Package
outcome <- forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
_
-> forall a. a -> Loop a
Break () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
forall a. a -> Loop a
Break () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
-> let Right SubscriptionConfirmation
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Required 1 (Value Int64))
lcp = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
len :: FieldType (Optional 2 (Value Int64))
len = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = FieldType (Required 1 (Value Int64))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = FieldType (Optional 2 (Value Int64))
len
, subSubId :: Maybe Text
subSubId = forall a. Maybe a
Nothing
} in
forall a. Loop a
Loop forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
streamEventAppearedCmd
-> let Right StreamEventAppeared
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
evt :: ResolvedEvent
evt = ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf forall a b. (a -> b) -> a -> b
$ forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ StreamEventAppeared -> Required 1 (Message ResolvedEventBuf)
streamResolvedEvent StreamEventAppeared
resp in
forall a. Loop a
Loop forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
| Bool
otherwise
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Loop a
Loop
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID
theSubId, Chan SubAction
subM)
where
stream :: Text
stream = forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId