{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Operation.Catchup
( catchup ) where
import Data.Int
import Data.Maybe
import Data.ProtocolBuffers
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import qualified Database.EventStore.Internal.Operation.ReadAllEvents.Message as ReadAll
import qualified Database.EventStore.Internal.Operation.ReadStreamEvents.Message as ReadStream
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
defaultBatchSize :: Int32
defaultBatchSize :: Int32
defaultBatchSize = Int32
500
data State s
= Init s
| Catchup UUID UUID s
| Live UUID s
createReadPkg
:: Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg :: forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts (StreamName Text
stream) t
evtNum Int32
batch Bool
tos Maybe Credentials
cred
= let
req :: Request
req =
Text -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadStream.newRequest
Text
stream
(EventNumber -> Int64
eventNumberToInt64 t
evtNum)
Int32
batch
Bool
tos
(Settings -> Bool
s_requireMaster Settings
setts) in
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readStreamEventsForwardCmd Maybe Credentials
cred Request
req
createReadPkg Settings
setts StreamId t
All t
pos Int32
batch Bool
tos Maybe Credentials
cred
= let
req :: Request
req =
Int64 -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadAll.newRequest
(Position -> Int64
positionCommit t
pos)
(Position -> Int64
positionPrepare t
pos)
Int32
batch
Bool
tos
(Settings -> Bool
s_requireMaster Settings
setts) in
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readAllEventsForwardCmd Maybe Credentials
cred Request
req
catchup
:: Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup :: forall t.
Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup Settings
setts Exec
exec StreamId t
streamId t
from Bool
tos Maybe Int32
batchSiz Maybe Credentials
cred
= do Mailbox
m <- forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
Chan SubAction
subM <- forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
TVar (Maybe UUID)
var <- forall a. a -> IO (TVar a)
newTVarIO forall a. Maybe a
Nothing
Async ()
_ <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s a.
Monad m =>
s -> (s -> m (LoopS s a)) -> m a
keepLoopingS (forall s. s -> State s
Init t
from) forall a b. (a -> b) -> a -> b
$ \case
Init t
pos
-> do let subReq :: SubscribeToStream
subReq = Text -> Bool -> SubscribeToStream
subscribeToStream Text
stream Bool
tos
Package
subPkg <- forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
subscribeToStreamCmd Maybe Credentials
cred SubscribeToStream
subReq
Package
readPkg <- forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId t
pos Int32
batch Bool
tos Maybe Credentials
cred
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
subPkg)
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
readPkg)
let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
subPkg
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe UUID)
var (forall a. a -> Maybe a
Just UUID
theSubId)
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId (Package -> UUID
packageCorrelation Package
readPkg) t
pos)
unchanged :: State t
unchanged@(Catchup UUID
theSubId UUID
readId t
pos)
-> do Either OperationError Package
outcome <- forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
e
-> case OperationError
e of
OperationError
ConnectionHasDropped
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. s -> State s
Init t
pos)
OperationError
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
-> let Right SubscriptionConfirmation
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Required 1 (Value Int64))
lcp = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
len :: FieldType (Optional 2 (Value Int64))
len = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = FieldType (Required 1 (Value Int64))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = FieldType (Optional 2 (Value Int64))
len
, subSubId :: Maybe Text
subSubId = forall a. Maybe a
Nothing
} in
forall s a. s -> LoopS s a
LoopS State t
unchanged forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS State t
unchanged
| UUID
readId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
-> case StreamId t
streamId of
StreamName Text
_
-> do let
Right Response
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
r :: FieldType (Required 2 (Enumeration Result))
r = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 2 (Enumeration Result)
ReadStream._result Response
resp
es :: FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Repeated 1 (Message ResolvedIndexedEvent)
ReadStream._events Response
resp
evts :: [ResolvedEvent]
evts = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es
eos :: FieldType (Required 5 (Value Bool))
eos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Bool)
ReadStream._endOfStream Response
resp
nxt :: FieldType (Required 3 (Value Int64))
nxt = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 3 (Value Int64)
ReadStream._nextNumber Response
resp
case FieldType (Required 2 (Enumeration Result))
r of
FieldType (Required 2 (Enumeration Result))
Result
ReadStream.NO_STREAM
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> s -> State s
Live UUID
theSubId t
pos)
FieldType (Required 2 (Enumeration Result))
Result
ReadStream.SUCCESS
-> do forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
if FieldType (Required 5 (Value Bool))
eos
then
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber FieldType (Required 3 (Value Int64))
nxt))
else
do Package
newReadPkg <- forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId (Int64 -> EventNumber
rawEventNumber FieldType (Required 3 (Value Int64))
nxt) Int32
batch Bool
tos Maybe Credentials
cred
let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId (Int64 -> EventNumber
rawEventNumber FieldType (Required 3 (Value Int64))
nxt))
FieldType (Required 2 (Enumeration Result))
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
StreamId t
All
-> do let
Right Response
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
r :: FieldType (Optional 6 (Enumeration Result))
r = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Optional 6 (Enumeration Result)
ReadAll._Result Response
resp
nc_pos :: FieldType (Required 4 (Value Int64))
nc_pos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 4 (Value Int64)
ReadAll._NextCommitPosition Response
resp
np_pos :: FieldType (Required 5 (Value Int64))
np_pos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Int64)
ReadAll._NextPreparePosition Response
resp
es :: FieldType (Repeated 3 (Message ResolvedEventBuf))
es = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Repeated 3 (Message ResolvedEventBuf)
ReadAll._Events Response
resp
evts :: [ResolvedEvent]
evts = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf FieldType (Repeated 3 (Message ResolvedEventBuf))
es
eos :: Bool
eos = forall mono. MonoFoldable mono => mono -> Bool
null [ResolvedEvent]
evts
n_pos :: Position
n_pos = Int64 -> Int64 -> Position
Position FieldType (Required 4 (Value Int64))
nc_pos FieldType (Required 5 (Value Int64))
np_pos
case forall a. a -> Maybe a -> a
fromMaybe Result
ReadAll.SUCCESS FieldType (Optional 6 (Enumeration Result))
r of
Result
ReadAll.SUCCESS
-> do forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
if Bool
eos
then
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> s -> State s
Live UUID
theSubId Position
n_pos)
else
do Package
newReadPkg <- forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId Position
n_pos Int32
batch Bool
tos Maybe Credentials
cred
let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId Position
n_pos)
Result
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
| Bool
otherwise
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS State t
unchanged
unchanged :: State t
unchanged@(Live UUID
theSubId t
pos)
-> do Either OperationError Package
outcome <- forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
e
-> case OperationError
e of
OperationError
ConnectionHasDropped
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. s -> State s
Init t
pos)
OperationError
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
-> let Right SubscriptionConfirmation
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Required 1 (Value Int64))
lcp = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
len :: FieldType (Optional 2 (Value Int64))
len = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = FieldType (Required 1 (Value Int64))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = FieldType (Optional 2 (Value Int64))
len
, subSubId :: Maybe Text
subSubId = forall a. Maybe a
Nothing
} in
forall s a. s -> LoopS s a
LoopS State t
unchanged forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
streamEventAppearedCmd
-> let
Right StreamEventAppeared
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
evt :: ResolvedEvent
evt = ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf forall a b. (a -> b) -> a -> b
$ forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ StreamEventAppeared -> Required 1 (Message ResolvedEventBuf)
streamResolvedEvent StreamEventAppeared
resp
nextState :: State t
nextState =
case StreamId t
streamId of
StreamName Text
_
-> let nxt :: Int64
nxt = ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
evt
in forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber Int64
nxt)
StreamId t
All
-> let Just Position
nxtPos = ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
evt
in forall s. UUID -> s -> State s
Live UUID
theSubId Position
nxtPos in
forall s a. s -> LoopS s a
LoopS State t
nextState forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
| Bool
otherwise
-> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS State t
unchanged
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TVar (Maybe UUID)
var, Chan SubAction
subM)
where
batch :: Int32
batch = forall a. a -> Maybe a -> a
fromMaybe Int32
defaultBatchSize Maybe Int32
batchSiz
stream :: Text
stream = forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId