{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
module Database.EventStore.Internal.Operation.Catchup
( CatchupState(..)
, CatchupOpResult(..)
, Checkpoint(..)
, catchup
) where
import Data.Int
import Data.Maybe
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.ReadAllEvents
import Database.EventStore.Internal.Operation.ReadStreamEvents
import Database.EventStore.Internal.Operation.Volatile
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
data Checkpoint = CheckpointNumber Int32 | CheckpointPosition Position
defaultBatchSize :: Int32
defaultBatchSize = 500
streamNotFound :: Text -> OperationError
streamNotFound stream = StreamNotFound stream
data CatchupState
= RegularCatchup Text Int64
| AllCatchup Position
deriving Show
fetch :: Settings
-> Int32
-> Bool
-> CatchupState
-> Maybe Credentials
-> Code o SomeSlice
fetch setts batch tos state cred =
case state of
RegularCatchup stream n -> do
outcome <- deconstruct $ fmap Left $
readStreamEvents setts Forward stream n batch tos cred
fromReadResult stream outcome (return . toSlice)
AllCatchup (Position com pre) ->
deconstruct $ fmap (Left . toSlice) $
readAllEvents setts com pre batch tos Forward cred
updateState :: CatchupState -> Location -> CatchupState
updateState (RegularCatchup stream _) (StreamEventNumber n) =
RegularCatchup stream n
updateState (AllCatchup _) (StreamPosition p) = AllCatchup p
updateState x y = error $ "Unexpected input updateState: " <> show (x,y)
sourceStream :: Settings
-> Int32
-> Bool
-> CatchupState
-> Maybe Credentials
-> Operation SubAction
sourceStream setts batch tos start cred = unfoldPlan start go
where
go state = do
s <- fetch setts batch tos state cred
traverse_ (yield . Submit) (sliceEvents s)
when (sliceEOS s)
stop
return $ updateState state (sliceNext s)
catchupStreamName :: CatchupState -> Text
catchupStreamName (RegularCatchup stream _) = stream
catchupStreamName _ = ""
data CatchupOpResult =
CatchupOpResult { catchupReadEvents :: ![ResolvedEvent]
, catchupEndOfStream :: !Bool
, catchupCheckpoint :: !Checkpoint
}
catchup :: Settings
-> CatchupState
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> Operation SubAction
catchup setts state tos batchSiz cred =
sourceStream setts batch tos state cred <> volatile stream tos cred
where
batch = fromMaybe defaultBatchSize batchSiz
stream = catchupStreamName state
fromReadResult :: Text
-> ReadResult 'RegularStream a
-> (a -> Code o x)
-> Code o x
fromReadResult stream res k =
case res of
ReadNoStream -> failure $ streamNotFound stream
ReadStreamDeleted s -> failure $ StreamDeleted s
ReadNotModified -> failure $ ServerError Nothing
ReadError e -> failure $ ServerError e
ReadAccessDenied s -> failure $ AccessDenied s
ReadSuccess ss -> k ss