{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Operation.Catchup
( catchup ) where
import Data.Int
import Data.Maybe
import Data.ProtocolBuffers
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import qualified Database.EventStore.Internal.Operation.ReadAllEvents.Message as ReadAll
import qualified Database.EventStore.Internal.Operation.ReadStreamEvents.Message as ReadStream
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
defaultBatchSize :: Int32
defaultBatchSize = 500
data State s
= Init s
| Catchup UUID UUID s
| Live UUID s
createReadPkg
:: Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg setts (StreamName stream) evtNum batch tos cred
= let
req =
ReadStream.newRequest
stream
(eventNumberToInt64 evtNum)
batch
tos
(s_requireMaster setts) in
createPkg readStreamEventsForwardCmd cred req
createReadPkg setts All pos batch tos cred
= let
req =
ReadAll.newRequest
(positionCommit pos)
(positionPrepare pos)
batch
tos
(s_requireMaster setts) in
createPkg readAllEventsForwardCmd cred req
catchup
:: Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup setts exec streamId from tos batchSiz cred
= do m <- mailboxNew
subM <- newChan
var <- newTVarIO Nothing
_ <- async $ keepLoopingS (Init from) $ \case
Init pos
-> do let subReq = subscribeToStream stream tos
subPkg <- createPkg subscribeToStreamCmd cred subReq
readPkg <- createReadPkg setts streamId pos batch tos cred
publishWith exec (Transmit m (KeepAlive subscriptionDroppedCmd) subPkg)
publishWith exec (Transmit m OneTime readPkg)
let theSubId = packageCorrelation subPkg
atomically $ writeTVar var (Just theSubId)
pure $ LoopS (Catchup theSubId (packageCorrelation readPkg) pos)
unchanged@(Catchup theSubId readId pos)
-> do outcome <- mailboxRead m
case outcome of
Left e
-> case e of
ConnectionHasDropped
-> pure $ LoopS (Init pos)
_ -> BreakS () <$ writeChan subM (Dropped SubAborted)
Right respPkg
| theSubId == packageCorrelation respPkg
&& packageCmd respPkg == subscriptionDroppedCmd
-> let Right resp = decodePkg respPkg
reason = fromMaybe D_Unsubscribed (getField $ dropReason resp)
subReason = toSubDropReason reason in
BreakS () <$ writeChan subM (Dropped subReason)
| theSubId == packageCorrelation respPkg
&& packageCmd respPkg == subscriptionConfirmationCmd
-> let Right resp = decodePkg respPkg
lcp = getField $ subscribeLastCommitPos resp
len = getField $ subscribeLastEventNumber resp
details =
SubDetails
{ subId = theSubId
, subCommitPos = lcp
, subLastEventNum = len
, subSubId = Nothing
} in
LoopS unchanged <$ writeChan subM (Confirmed details)
| theSubId == packageCorrelation respPkg
-> pure $ LoopS unchanged
| readId == packageCorrelation respPkg
-> case streamId of
StreamName _
-> do let
Right resp = decodePkg respPkg
r = getField $ ReadStream._result resp
es = getField $ ReadStream._events resp
evts = fmap newResolvedEvent es
eos = getField $ ReadStream._endOfStream resp
nxt = getField $ ReadStream._nextNumber resp
case r of
ReadStream.NO_STREAM
-> pure $ LoopS (Live theSubId pos)
ReadStream.SUCCESS
-> do traverse_ (writeChan subM . Submit) evts
if eos
then
pure $ LoopS (Live theSubId (rawEventNumber nxt))
else
do newReadPkg <- createReadPkg setts streamId (rawEventNumber nxt) batch tos cred
let newReadId = packageCorrelation newReadPkg
publishWith exec (Transmit m OneTime newReadPkg)
pure $ LoopS (Catchup theSubId newReadId (rawEventNumber nxt))
_ -> BreakS () <$ writeChan subM (Dropped SubAborted)
All
-> do let
Right resp = decodePkg respPkg
r = getField $ ReadAll._Result resp
nc_pos = getField $ ReadAll._NextCommitPosition resp
np_pos = getField $ ReadAll._NextPreparePosition resp
es = getField $ ReadAll._Events resp
evts = fmap newResolvedEventFromBuf es
eos = null evts
n_pos = Position nc_pos np_pos
case fromMaybe ReadAll.SUCCESS r of
ReadAll.SUCCESS
-> do traverse_ (writeChan subM . Submit) evts
if eos
then
pure $ LoopS (Live theSubId n_pos)
else
do newReadPkg <- createReadPkg setts streamId n_pos batch tos cred
let newReadId = packageCorrelation newReadPkg
publishWith exec (Transmit m OneTime newReadPkg)
pure $ LoopS (Catchup theSubId newReadId n_pos)
_ -> BreakS () <$ writeChan subM (Dropped SubAborted)
| otherwise
-> pure $ LoopS unchanged
unchanged@(Live theSubId pos)
-> do outcome <- mailboxRead m
case outcome of
Left e
-> case e of
ConnectionHasDropped
-> pure $ LoopS (Init pos)
_ -> BreakS () <$ writeChan subM (Dropped SubAborted)
Right respPkg
| theSubId == packageCorrelation respPkg
&& packageCmd respPkg == subscriptionDroppedCmd
-> let Right resp = decodePkg respPkg
reason = fromMaybe D_Unsubscribed (getField $ dropReason resp)
subReason = toSubDropReason reason in
BreakS () <$ writeChan subM (Dropped subReason)
| theSubId == packageCorrelation respPkg
&& packageCmd respPkg == subscriptionConfirmationCmd
-> let Right resp = decodePkg respPkg
lcp = getField $ subscribeLastCommitPos resp
len = getField $ subscribeLastEventNumber resp
details =
SubDetails
{ subId = theSubId
, subCommitPos = lcp
, subLastEventNum = len
, subSubId = Nothing
} in
LoopS unchanged <$ writeChan subM (Confirmed details)
| theSubId == packageCorrelation respPkg
&& packageCmd respPkg == streamEventAppearedCmd
-> let
Right resp = decodePkg respPkg
evt = newResolvedEventFromBuf $ getField $ streamResolvedEvent resp
nextState =
case streamId of
StreamName _
-> let nxt = resolvedEventOriginalEventNumber evt
in Live theSubId (rawEventNumber nxt)
All
-> let Just nxtPos = resolvedEventPosition evt
in Live theSubId nxtPos in
LoopS nextState <$ writeChan subM (Submit evt)
| otherwise
-> pure $ LoopS unchanged
pure (var, subM)
where
batch = fromMaybe defaultBatchSize batchSiz
stream = streamIdRaw streamId