{-# LANGUAGE GADTs #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS -Wno-orphans #-}
module Database.EventStore.Internal.Subscription.Catchup where
import Safe (fromJustNote)
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Types
receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready :: forall t. StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready StreamName{} t
old ResolvedEvent
e =
Int64 -> EventNumber
EventNumber (ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
e) forall a. Ord a => a -> a -> Bool
< t
old
receivedAlready StreamId t
All t
old ResolvedEvent
e =
let pos :: Position
pos =
forall a. Partial => String -> Maybe a -> a
fromJustNote
String
"Position is always defined when reading events from $all stream"
forall a b. (a -> b) -> a -> b
$ ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
e in
Position
pos forall a. Ord a => a -> a -> Bool
< t
old
nextTarget :: StreamId t -> ResolvedEvent -> t
nextTarget :: forall t. StreamId t -> ResolvedEvent -> t
nextTarget StreamName{} ResolvedEvent
e =
Int64 -> EventNumber
EventNumber (ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
e)
nextTarget StreamId t
All ResolvedEvent
e =
forall a. Partial => String -> Maybe a -> a
fromJustNote
String
"Position is always defined when reading events from $all stream"
forall a b. (a -> b) -> a -> b
$ ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
e
data CatchupSubscription t =
CatchupSubscription
{ forall t. CatchupSubscription t -> Exec
_catchupExec :: Exec
, forall t. CatchupSubscription t -> StreamId t
_catchupStream :: StreamId t
, forall t. CatchupSubscription t -> TVar (Maybe UUID)
_catchupSub :: TVar (Maybe UUID)
, forall t. CatchupSubscription t -> Chan SubAction
_catchupChan :: Chan SubAction
}
instance Subscription (CatchupSubscription s) where
nextSubEvent :: CatchupSubscription s -> IO SubAction
nextSubEvent CatchupSubscription s
s = forall (m :: * -> *) a. MonadBase IO m => Chan a -> m a
readChan (forall t. CatchupSubscription t -> Chan SubAction
_catchupChan CatchupSubscription s
s)
unsubscribe :: CatchupSubscription s -> IO ()
unsubscribe CatchupSubscription s
s
= do UUID
subId <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$
do Maybe UUID
idMay <- forall a. TVar a -> STM a
readTVar (forall t. CatchupSubscription t -> TVar (Maybe UUID)
_catchupSub CatchupSubscription s
s)
case Maybe UUID
idMay of
Maybe UUID
Nothing -> forall a. STM a
retrySTM
Just UUID
sid -> forall (f :: * -> *) a. Applicative f => a -> f a
pure UUID
sid
let pkg :: Package
pkg = UUID -> Package
createUnsubscribePackage UUID
subId
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (forall t. CatchupSubscription t -> Exec
_catchupExec CatchupSubscription s
s) (Package -> SendPackage
SendPackage Package
pkg)
instance SubscriptionStream (CatchupSubscription t) t where
subscriptionStream :: CatchupSubscription t -> StreamId t
subscriptionStream = forall t. CatchupSubscription t -> StreamId t
_catchupStream
newCatchupSubscription
:: Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription :: forall t.
Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription Exec
exec Bool
tos Maybe Int32
batch Maybe Credentials
cred StreamId t
streamId t
seed
= do (TVar (Maybe UUID)
var, Chan SubAction
chan) <- forall t.
Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup (Exec -> Settings
execSettings Exec
exec) Exec
exec StreamId t
streamId t
seed Bool
tos Maybe Int32
batch Maybe Credentials
cred
let sub :: CatchupSubscription t
sub =
CatchupSubscription
{ _catchupExec :: Exec
_catchupExec = Exec
exec
, _catchupStream :: StreamId t
_catchupStream = StreamId t
streamId
, _catchupSub :: TVar (Maybe UUID)
_catchupSub = TVar (Maybe UUID)
var
, _catchupChan :: Chan SubAction
_catchupChan = Chan SubAction
chan
}
forall (f :: * -> *) a. Applicative f => a -> f a
pure CatchupSubscription t
sub