module Database.EventStore.Internal.Operation.Persist (persist) where
import Data.ProtocolBuffers
import Data.UUID
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
persist
:: Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist :: Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist Exec
exec Text
grp Text
stream Int32
bufSize Maybe Credentials
cred
= do Mailbox
m <- forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
Chan SubAction
subM <- forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
TVar (Maybe Text)
var <- forall a. a -> IO (TVar a)
newTVarIO forall a. Maybe a
Nothing
let req :: ConnectToPersistentSubscription
req = Text -> Text -> Int32 -> ConnectToPersistentSubscription
_connectToPersistentSubscription Text
grp Text
stream Int32
bufSize
Package
pkg <- forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
connectToPersistentSubscriptionCmd Maybe Credentials
cred ConnectToPersistentSubscription
req
let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
pkg
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
pkg)
Async ()
_ <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => m (Loop a) -> m a
keepLooping forall a b. (a -> b) -> a -> b
$
do Either OperationError Package
outcome <- forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
_
-> forall a. a -> Loop a
Break () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
forall a. a -> Loop a
Break () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
persistentSubscriptionConfirmationCmd
-> do let Right PersistentSubscriptionConfirmation
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Required 1 (Value Int64))
lcp = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Required 1 (Value Int64)
pscLastCommitPos PersistentSubscriptionConfirmation
resp
subSubId :: FieldType (Required 2 (Value Text))
subSubId = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Required 2 (Value Text)
pscId PersistentSubscriptionConfirmation
resp
len :: FieldType (Optional 3 (Value Int64))
len = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionConfirmation -> Optional 3 (Value Int64)
pscLastEvtNumber PersistentSubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = FieldType (Required 1 (Value Int64))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = FieldType (Optional 3 (Value Int64))
len
, subSubId :: Maybe Text
subSubId = forall a. a -> Maybe a
Just FieldType (Required 2 (Value Text))
subSubId
}
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe Text)
var (forall a. a -> Maybe a
Just FieldType (Required 2 (Value Text))
subSubId)
forall a. Loop a
Loop forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
persistentSubscriptionStreamEventAppearedCmd
-> let Right PersistentSubscriptionStreamEventAppeared
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
evt :: ResolvedEvent
evt = ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent forall a b. (a -> b) -> a -> b
$ forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ PersistentSubscriptionStreamEventAppeared
-> Required 1 (Message ResolvedIndexedEvent)
psseaEvt PersistentSubscriptionStreamEventAppeared
resp in
forall a. Loop a
Loop forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
| Bool
otherwise
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Loop a
Loop
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID
theSubId, TVar (Maybe Text)
var, Chan SubAction
subM)