{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE StandaloneDeriving #-}
module Database.EventStore.Streaming
( ReadError(..)
, Fetch(..)
, ReadResultHandler(..)
, readThroughForward
, readThroughBackward
, throwOnError
, defaultReadResultHandler
, onRegularStream
, readThrough
) where
import Control.Exception (Exception, throwIO)
import Data.Int (Int32)
import Data.Maybe (fromMaybe)
import Data.Typeable (Typeable)
import Prelude
import Control.Concurrent.Async.Lifted (wait)
import Control.Monad.Except (ExceptT, throwError, runExceptT)
import Data.Text (Text)
import Streaming
import qualified Streaming.Prelude as Streaming
import qualified Database.EventStore as ES
data ReadError t where
StreamDeleted :: ES.StreamName -> ReadError ES.EventNumber
ReadError :: Maybe Text -> ReadError t
AccessDenied :: ES.StreamId t -> ReadError t
NoStream :: ReadError ES.EventNumber
deriving instance Show (ReadError t)
instance (Show t, Typeable t) => Exception (ReadError t)
data Fetch t = FetchError !(ReadError t) | Fetch !(ES.Slice t)
newtype ReadResultHandler =
ReadResultHandler
{ ReadResultHandler
-> forall t. StreamId t -> BatchResult t -> Fetch t
runReadResultHandler :: forall t. ES.StreamId t -> ES.BatchResult t -> Fetch t }
defaultReadResultHandler :: ReadResultHandler
defaultReadResultHandler :: ReadResultHandler
defaultReadResultHandler = (forall t. StreamId t -> BatchResult t -> Fetch t)
-> ReadResultHandler
ReadResultHandler forall t. StreamId t -> BatchResult t -> Fetch t
go
where
go :: ES.StreamId t -> ES.BatchResult t -> Fetch t
go :: forall t. StreamId t -> BatchResult t -> Fetch t
go ES.StreamName{} = forall {t}. ReadResult t (Slice t) -> Fetch t
toFetch
go StreamId t
ES.All = forall t. Slice t -> Fetch t
Fetch
toFetch :: ReadResult t (Slice t) -> Fetch t
toFetch ReadResult t (Slice t)
ES.ReadNoStream = forall t. Slice t -> Fetch t
Fetch forall t. Slice t
ES.emptySlice
toFetch ReadResult t (Slice t)
ES.ReadNotModified = forall t. Slice t -> Fetch t
Fetch forall t. Slice t
ES.emptySlice
toFetch (ES.ReadStreamDeleted StreamName
n) = forall t. ReadError t -> Fetch t
FetchError (StreamName -> ReadError EventNumber
StreamDeleted StreamName
n)
toFetch (ES.ReadError Maybe Text
e) = forall t. ReadError t -> Fetch t
FetchError (forall t. Maybe Text -> ReadError t
ReadError Maybe Text
e)
toFetch (ES.ReadAccessDenied StreamId t
n) = forall t. ReadError t -> Fetch t
FetchError (forall t. StreamId t -> ReadError t
AccessDenied StreamId t
n)
toFetch (ES.ReadSuccess Slice t
s) = forall t. Slice t -> Fetch t
Fetch Slice t
s
onRegularStream :: (ES.ReadResult ES.EventNumber (ES.Slice ES.EventNumber) -> Fetch ES.EventNumber)
-> ReadResultHandler
onRegularStream :: (ReadResult EventNumber (Slice EventNumber) -> Fetch EventNumber)
-> ReadResultHandler
onRegularStream ReadResult EventNumber (Slice EventNumber) -> Fetch EventNumber
callback = (forall t. StreamId t -> BatchResult t -> Fetch t)
-> ReadResultHandler
ReadResultHandler forall t. StreamId t -> BatchResult t -> Fetch t
go
where
go :: ES.StreamId t -> ES.BatchResult t -> Fetch t
go :: forall t. StreamId t -> BatchResult t -> Fetch t
go ES.StreamName{} = ReadResult EventNumber (Slice EventNumber) -> Fetch EventNumber
callback
go StreamId t
ES.All = forall t. Slice t -> Fetch t
Fetch
data State t = Need t | Fetched ![ES.ResolvedEvent] !(Maybe t)
streaming :: (t -> IO (Fetch t))
-> t
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
streaming :: forall t.
(t -> IO (Fetch t))
-> t -> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
streaming t -> IO (Fetch t)
iteratee = forall (m :: * -> *) s r a.
Monad m =>
(s -> m (Either r (a, s))) -> s -> Stream (Of a) m r
Streaming.unfoldr State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall t. t -> State t
Need
where
go :: State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go (Fetched [ResolvedEvent]
buffer Maybe t
next) =
case [ResolvedEvent]
buffer of
ResolvedEvent
e:[ResolvedEvent]
rest -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (ResolvedEvent
e, forall t. [ResolvedEvent] -> Maybe t -> State t
Fetched [ResolvedEvent]
rest Maybe t
next))
[] -> forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall {b}. ExceptT (ReadError t) IO (Either () b)
stop (State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall t. t -> State t
Need) Maybe t
next
go (Need t
pos) = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (t -> IO (Fetch t)
iteratee t
pos) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
FetchError ReadError t
e -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ReadError t
e
Fetch Slice t
s ->
case Slice t
s of
Slice t
ES.SliceEndOfStream -> forall {b}. ExceptT (ReadError t) IO (Either () b)
stop
ES.Slice [ResolvedEvent]
xs Maybe t
next -> State t
-> ExceptT (ReadError t) IO (Either () (ResolvedEvent, State t))
go (forall t. [ResolvedEvent] -> Maybe t -> State t
Fetched [ResolvedEvent]
xs Maybe t
next)
stop :: ExceptT (ReadError t) IO (Either () b)
stop = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left ())
readThroughForward :: ES.Connection
-> ES.StreamId t
-> ES.ResolveLink
-> t
-> Maybe Int32
-> Maybe ES.Credentials
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughForward :: forall t.
Connection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughForward Connection
conn = forall t.
Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough Connection
conn ReadResultHandler
defaultReadResultHandler ReadDirection
ES.Forward
readThroughBackward :: ES.Connection
-> ES.StreamId t
-> ES.ResolveLink
-> t
-> Maybe Int32
-> Maybe ES.Credentials
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughBackward :: forall t.
Connection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughBackward Connection
conn = forall t.
Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough Connection
conn ReadResultHandler
defaultReadResultHandler ReadDirection
ES.Backward
throwOnError :: (Show t, Typeable t)
=> Stream (Of a) (ExceptT (ReadError t) IO) ()
-> Stream (Of a) IO ()
throwOnError :: forall t a.
(Show t, Typeable t) =>
Stream (Of a) (ExceptT (ReadError t) IO) () -> Stream (Of a) IO ()
throwOnError = forall {k} (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
(b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall {e} {b}. Exception e => ExceptT e IO b -> IO b
go
where
go :: ExceptT e IO b -> IO b
go ExceptT e IO b
action =
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT e IO b
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left e
e -> forall e a. Exception e => e -> IO a
throwIO e
e
Right b
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
a
readThrough :: ES.Connection
-> ReadResultHandler
-> ES.ReadDirection
-> ES.StreamId t
-> ES.ResolveLink
-> t
-> Maybe Int32
-> Maybe ES.Credentials
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough :: forall t.
Connection
-> ReadResultHandler
-> ReadDirection
-> StreamId t
-> ResolveLink
-> t
-> Maybe Int32
-> Maybe Credentials
-> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough Connection
conn ReadResultHandler
handler ReadDirection
dir StreamId t
streamId ResolveLink
lnk t
from Maybe Int32
sizMay Maybe Credentials
cred = forall t.
(t -> IO (Fetch t))
-> t -> Stream (Of ResolvedEvent) (ExceptT (ReadError t) IO) ()
streaming t -> IO (Fetch t)
iteratee t
from
where
batchSize :: Int32
batchSize = forall a. a -> Maybe a -> a
fromMaybe Int32
500 Maybe Int32
sizMay
iteratee :: t -> IO (Fetch t)
iteratee =
case ReadDirection
dir of
ReadDirection
ES.Forward -> forall t.
Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readForward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
batchSize ResolveLink
lnk Maybe Credentials
cred
ReadDirection
ES.Backward -> forall t.
Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readBackward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
batchSize ResolveLink
lnk Maybe Credentials
cred
readForward :: ES.Connection
-> ReadResultHandler
-> ES.StreamId t
-> Int32
-> ES.ResolveLink
-> Maybe ES.Credentials
-> t
-> IO (Fetch t)
readForward :: forall t.
Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readForward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
siz ResolveLink
lnk Maybe Credentials
creds t
start =
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ReadResultHandler
-> forall t. StreamId t -> BatchResult t -> Fetch t
runReadResultHandler ReadResultHandler
handler StreamId t
streamId) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
forall t.
Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
ES.readEventsForward Connection
conn StreamId t
streamId t
start Int32
siz ResolveLink
lnk Maybe Credentials
creds
readBackward :: ES.Connection
-> ReadResultHandler
-> ES.StreamId t
-> Int32
-> ES.ResolveLink
-> Maybe ES.Credentials
-> t
-> IO (Fetch t)
readBackward :: forall t.
Connection
-> ReadResultHandler
-> StreamId t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> t
-> IO (Fetch t)
readBackward Connection
conn ReadResultHandler
handler StreamId t
streamId Int32
siz ResolveLink
lnk Maybe Credentials
creds t
start =
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ReadResultHandler
-> forall t. StreamId t -> BatchResult t -> Fetch t
runReadResultHandler ReadResultHandler
handler StreamId t
streamId) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
forall t.
Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
ES.readEventsBackward Connection
conn StreamId t
streamId t
start Int32
siz ResolveLink
lnk Maybe Credentials
creds