module Database.EventStore.Internal.Operation.Persist (persist) where
import Data.ProtocolBuffers
import Data.UUID
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
persist
:: Exec
-> Text
-> Text
-> Int32
-> Maybe Credentials
-> IO (UUID, TVar (Maybe Text), Chan SubAction)
persist exec grp stream bufSize cred
= do m <- mailboxNew
subM <- newChan
var <- newTVarIO Nothing
let req = _connectToPersistentSubscription grp stream bufSize
pkg <- createPkg connectToPersistentSubscriptionCmd cred req
let theSubId = packageCorrelation pkg
publishWith exec (Transmit m (KeepAlive subscriptionDroppedCmd) pkg)
_ <- async $ keepLooping $
do outcome <- mailboxRead m
case outcome of
Left _
-> Break () <$ writeChan subM (Dropped SubAborted)
Right respPkg
| packageCmd respPkg == subscriptionDroppedCmd
-> let Right resp = decodePkg respPkg
reason = fromMaybe D_Unsubscribed (getField $ dropReason resp)
subReason = toSubDropReason reason in
Break () <$ writeChan subM (Dropped subReason)
| packageCmd respPkg == persistentSubscriptionConfirmationCmd
-> do let Right resp = decodePkg respPkg
lcp = getField $ pscLastCommitPos resp
subSubId = getField $ pscId resp
len = getField $ pscLastEvtNumber resp
details =
SubDetails
{ subId = theSubId
, subCommitPos = lcp
, subLastEventNum = len
, subSubId = Just subSubId
}
atomically $ writeTVar var (Just subSubId)
Loop <$ writeChan subM (Confirmed details)
| packageCmd respPkg == persistentSubscriptionStreamEventAppearedCmd
-> let Right resp = decodePkg respPkg
evt = newResolvedEvent $ getField $ psseaEvt resp in
Loop <$ writeChan subM (Submit evt)
| otherwise
-> pure Loop
pure (theSubId, var, subM)