{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Subscription.Api where
import Streaming
import qualified Streaming.Prelude as Streaming
import Database.EventStore.Internal.Types
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Types
class Subscription s where
nextSubEvent :: s -> IO SubAction
unsubscribe :: s -> IO ()
class SubscriptionStream s t | t -> s where
subscriptionStream :: s -> StreamId t
streamSubEvents :: Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents :: forall s. Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents s
s
= do Stream (Of SubAction) IO Any
rest <- forall (m :: * -> *) a r.
Monad m =>
(a -> Bool)
-> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
Streaming.span SubAction -> Bool
predicate forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
Streaming.repeatM (forall s. Subscription s => s -> IO SubAction
nextSubEvent s
s)
Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Maybe (a, Stream (Of a) m r))
Streaming.uncons Stream (Of SubAction) IO Any
rest
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome forall a b. (a -> b) -> a -> b
$ \(SubAction
dropped, Stream (Of SubAction) IO Any
_) -> forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Streaming.yield SubAction
dropped
where
predicate :: SubAction -> Bool
predicate (Dropped SubDropReason
_) = Bool
False
predicate SubAction
_ = Bool
True
streamSubResolvedEvents :: Subscription s => s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents :: forall s. Subscription s => s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents = forall (m :: * -> *) a b r.
Monad m =>
(a -> Maybe b) -> Stream (Of a) m r -> Stream (Of b) m r
Streaming.mapMaybe SubAction -> Maybe ResolvedEvent
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s. Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents
where
go :: SubAction -> Maybe ResolvedEvent
go (Submit ResolvedEvent
e) = forall a. a -> Maybe a
Just ResolvedEvent
e
go SubAction
_ = forall a. Maybe a
Nothing