{-# LANGUAGE GADTs #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS -Wno-orphans #-}
module Database.EventStore.Internal.Subscription.Catchup where
import Safe (fromJustNote)
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Types
receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready StreamName{} old e =
EventNumber (resolvedEventOriginalEventNumber e) < old
receivedAlready All old e =
let pos =
fromJustNote
"Position is always defined when reading events from $all stream"
$ resolvedEventPosition e in
pos < old
nextTarget :: StreamId t -> ResolvedEvent -> t
nextTarget StreamName{} e =
EventNumber (resolvedEventOriginalEventNumber e)
nextTarget All e =
fromJustNote
"Position is always defined when reading events from $all stream"
$ resolvedEventPosition e
data CatchupSubscription t =
CatchupSubscription
{ _catchupExec :: Exec
, _catchupStream :: StreamId t
, _catchupSub :: TVar (Maybe UUID)
, _catchupChan :: Chan SubAction
}
instance Subscription (CatchupSubscription s) where
nextSubEvent s = readChan (_catchupChan s)
unsubscribe s
= do subId <- atomically $
do idMay <- readTVar (_catchupSub s)
case idMay of
Nothing -> retrySTM
Just sid -> pure sid
let pkg = createUnsubscribePackage subId
publishWith (_catchupExec s) (SendPackage pkg)
instance SubscriptionStream (CatchupSubscription t) t where
subscriptionStream = _catchupStream
newCatchupSubscription
:: Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription exec tos batch cred streamId seed
= do (var, chan) <- catchup (execSettings exec) exec streamId seed tos batch cred
let sub =
CatchupSubscription
{ _catchupExec = exec
, _catchupStream = streamId
, _catchupSub = var
, _catchupChan = chan
}
pure sub