{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Asapo.Either.Consumer
( FilesystemFlag (..),
ServerName (..),
IncludeIncompleteFlag (..),
SourcePath (..),
Dataset (..),
MessageMeta (..),
DeleteFlag (..),
ErrorOnNotExistFlag (..),
Consumer,
NetworkConnectionType (..),
StreamFilter (..),
MessageMetaHandle,
GroupId,
Error (..),
ErrorType (..),
withConsumer,
retrieveDataForMessageMeta,
queryMessages,
resendNacs,
getNextDataset,
retrieveDataFromMeta,
setStreamPersistent,
getLastMessageMetaAndData,
getLastMessageMeta,
getLastMessageData,
getLastInGroupMessageMetaAndData,
getLastInGroupMessageMeta,
getLastInGroupMessageData,
getNextMessageMetaAndData,
getNextMessageMeta,
getNextMessageData,
getCurrentDatasetCount,
getBeamtimeMeta,
getCurrentSize,
acknowledge,
negativeAcknowledge,
getMessageMetaAndDataById,
getMessageMetaById,
getMessageDataById,
getUnacknowledgedMessages,
withGroupId,
queryMessagesHandles,
setTimeout,
getLastDataset,
getLastDatasetInGroup,
resetLastReadMarker,
setLastReadMarker,
getCurrentConnectionType,
getStreamList,
deleteStream,
resolveMetadata,
)
where
import Asapo.Either.Common (MessageId (MessageId), SourceCredentials, StreamInfo, StreamName (StreamName), nominalDiffToMillis, peekConstCStringText, retrieveStreamInfoFromC, stringHandleToText, timespecToUTC, withCStringNToText, withConstText, withCredentials, withPtr)
import Asapo.Raw.Common (AsapoErrorHandle, AsapoMessageDataHandle (AsapoMessageDataHandle), AsapoStringHandle, ConstCString, asapo_error_explain, asapo_free_error_handle, asapo_free_stream_infos_handle, asapo_free_string_handle, asapo_is_error, asapo_new_error_handle, asapo_stream_infos_get_item)
import Asapo.Raw.Consumer (AsapoConsumerErrorType, AsapoConsumerHandle, AsapoDataSetHandle, AsapoIdListHandle, AsapoMessageMetaHandle (AsapoMessageMetaHandle), AsapoNetworkConnectionType, AsapoStreamFilter, asapo_consumer_acknowledge, asapo_consumer_current_connection_type, asapo_consumer_delete_stream, asapo_consumer_generate_new_group_id, asapo_consumer_get_beamtime_meta, asapo_consumer_get_by_id, asapo_consumer_get_current_dataset_count, asapo_consumer_get_current_size, asapo_consumer_get_last, asapo_consumer_get_last_dataset, asapo_consumer_get_last_dataset_ingroup, asapo_consumer_get_last_ingroup, asapo_consumer_get_next, asapo_consumer_get_next_dataset, asapo_consumer_get_stream_list, asapo_consumer_get_unacknowledged_messages, asapo_consumer_negative_acknowledge, asapo_consumer_query_messages, asapo_consumer_reset_last_read_marker, asapo_consumer_retrieve_data, asapo_consumer_set_last_read_marker, asapo_consumer_set_resend_nacs, asapo_consumer_set_stream_persistent, asapo_consumer_set_timeout, asapo_create_consumer, asapo_dataset_get_expected_size, asapo_dataset_get_id, asapo_dataset_get_item, asapo_dataset_get_size, asapo_error_get_type, asapo_free_consumer_handle, asapo_free_id_list_handle, asapo_free_message_metas_handle, asapo_id_list_get_item, asapo_id_list_get_size, asapo_message_data_get_as_chars, asapo_message_meta_get_buf_id, asapo_message_meta_get_dataset_substream, asapo_message_meta_get_id, asapo_message_meta_get_metadata, asapo_message_meta_get_name, asapo_message_meta_get_size, asapo_message_meta_get_source, asapo_message_meta_get_timestamp, asapo_message_metas_get_item, asapo_message_metas_get_size, asapo_stream_infos_get_size, kAllStreams, kAsapoTcp, kDataNotInCache, kEndOfStream, kFinishedStreams, kInterruptedTransaction, kLocalIOError, kNoData, kPartialData, kStreamFinished, kUnavailableService, kUndefined, kUnfinishedStreams, kUnsupportedClient, kWrongInput)
import Asapo.Raw.FreeHandleHack (p_asapo_free_handle)
import Control.Applicative (Applicative ((<*>)), pure)
import Control.Exception (bracket)
import Control.Monad (Monad ((>>=)), (>=>))
import Data.Bool (Bool, otherwise)
import qualified Data.ByteString as BS
import Data.Either (Either (Left, Right))
import Data.Eq (Eq ((==)))
import Data.Function (($), (.))
import Data.Functor ((<$>))
import Data.Int (Int)
import Data.Maybe (Maybe (Just, Nothing))
import Data.Ord ((>))
import Data.Text (Text)
import Data.Time (NominalDiffTime, UTCTime)
import Data.Traversable (Traversable (traverse))
import Data.Word (Word64)
import Foreign (Storable (peek), alloca)
import Foreign.C (CInt)
import Foreign.C.ConstPtr (ConstPtr (unConstPtr))
import Foreign.ForeignPtr (ForeignPtr, newForeignPtr, withForeignPtr)
import Foreign.Ptr (Ptr)
import System.IO (IO)
import Text.Show (Show)
import Prelude (Enum, Num ((-)), fromIntegral)
newtype ServerName = ServerName Text
newtype SourcePath = SourcePath Text
data FilesystemFlag = WithFilesystem | WithoutFilesystem deriving (FilesystemFlag -> FilesystemFlag -> Bool
(FilesystemFlag -> FilesystemFlag -> Bool)
-> (FilesystemFlag -> FilesystemFlag -> Bool) -> Eq FilesystemFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemFlag -> FilesystemFlag -> Bool
== :: FilesystemFlag -> FilesystemFlag -> Bool
$c/= :: FilesystemFlag -> FilesystemFlag -> Bool
/= :: FilesystemFlag -> FilesystemFlag -> Bool
Eq)
newtype Consumer = Consumer AsapoConsumerHandle
data ErrorType
= ErrorNoData
| ErrorEndOfStream
| ErrorStreamFinished
| ErrorUnavailableService
| ErrorInterruptedTransaction
| ErrorLocalIOError
| ErrorWrongInput
| ErrorPartialData
| ErrorUnsupportedClient
| ErrorDataNotInCache
| ErrorUnknownError
deriving (Int -> ErrorType -> ShowS
[ErrorType] -> ShowS
ErrorType -> String
(Int -> ErrorType -> ShowS)
-> (ErrorType -> String)
-> ([ErrorType] -> ShowS)
-> Show ErrorType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ErrorType -> ShowS
showsPrec :: Int -> ErrorType -> ShowS
$cshow :: ErrorType -> String
show :: ErrorType -> String
$cshowList :: [ErrorType] -> ShowS
showList :: [ErrorType] -> ShowS
Show)
convertErrorType :: AsapoConsumerErrorType -> ErrorType
convertErrorType :: AsapoBool -> ErrorType
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kNoData = ErrorType
ErrorNoData
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kEndOfStream = ErrorType
ErrorEndOfStream
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kStreamFinished = ErrorType
ErrorStreamFinished
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kUnavailableService = ErrorType
ErrorUnavailableService
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kInterruptedTransaction = ErrorType
ErrorInterruptedTransaction
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kLocalIOError = ErrorType
ErrorLocalIOError
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kWrongInput = ErrorType
ErrorWrongInput
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kPartialData = ErrorType
ErrorPartialData
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kUnsupportedClient = ErrorType
ErrorUnsupportedClient
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kDataNotInCache = ErrorType
ErrorDataNotInCache
convertErrorType AsapoBool
_ = ErrorType
ErrorUnknownError
data Error = Error
{ Error -> Text
errorMessage :: Text,
Error -> ErrorType
errorType :: ErrorType
}
deriving (Int -> Error -> ShowS
[Error] -> ShowS
Error -> String
(Int -> Error -> ShowS)
-> (Error -> String) -> ([Error] -> ShowS) -> Show Error
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Error -> ShowS
showsPrec :: Int -> Error -> ShowS
$cshow :: Error -> String
show :: Error -> String
$cshowList :: [Error] -> ShowS
showList :: [Error] -> ShowS
Show)
checkErrorWithGivenHandle :: AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle :: forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandle b
result = do
AsapoBool
isError <- AsapoErrorHandle -> IO AsapoBool
asapo_is_error AsapoErrorHandle
errorHandle
if AsapoBool
isError AsapoBool -> AsapoBool -> Bool
forall a. Ord a => a -> a -> Bool
> AsapoBool
0
then do
let explanationLength :: Int
explanationLength = Int
1024
Text
explanation <- Int -> (CString -> IO ()) -> IO Text
withCStringNToText Int
explanationLength \CString
explanationPtr ->
AsapoErrorHandle -> CString -> CSize -> IO ()
asapo_error_explain
AsapoErrorHandle
errorHandle
CString
explanationPtr
(Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
explanationLength)
AsapoBool
errorType' <- AsapoErrorHandle -> IO AsapoBool
asapo_error_get_type AsapoErrorHandle
errorHandle
Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error b
forall a b. a -> Either a b
Left (Text -> ErrorType -> Error
Error Text
explanation (AsapoBool -> ErrorType
convertErrorType AsapoBool
errorType')))
else Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Either Error b
forall a b. b -> Either a b
Right b
result)
withErrorHandle :: (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle :: forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle = IO AsapoErrorHandle
-> (AsapoErrorHandle -> IO ())
-> (AsapoErrorHandle -> IO c)
-> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoErrorHandle
asapo_new_error_handle AsapoErrorHandle -> IO ()
asapo_free_error_handle
checkError :: (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError :: forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError Ptr AsapoErrorHandle -> IO b
f = do
(AsapoErrorHandle -> IO (Either Error b)) -> IO (Either Error b)
forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle \AsapoErrorHandle
errorHandle -> do
(AsapoErrorHandle
errorHandlePtr, b
result) <- AsapoErrorHandle
-> (Ptr AsapoErrorHandle -> IO b) -> IO (AsapoErrorHandle, b)
forall a b. Storable a => a -> (Ptr a -> IO b) -> IO (a, b)
withPtr AsapoErrorHandle
errorHandle Ptr AsapoErrorHandle -> IO b
f
AsapoErrorHandle -> b -> IO (Either Error b)
forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandlePtr b
result
withSuccess :: (Ptr AsapoErrorHandle -> IO t) -> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess :: forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess Ptr AsapoErrorHandle -> IO t
toCheck t -> IO (Either Error b)
onSuccess = do
Either Error t
result <- (Ptr AsapoErrorHandle -> IO t) -> IO (Either Error t)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError Ptr AsapoErrorHandle -> IO t
toCheck
case Either Error t
result of
Left Error
e -> Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error b
forall a b. a -> Either a b
Left Error
e)
Right t
success -> t -> IO (Either Error b)
onSuccess t
success
create :: ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> IO (Either Error AsapoConsumerHandle)
create :: ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> IO (Either Error AsapoConsumerHandle)
create (ServerName Text
serverName) (SourcePath Text
sourcePath) FilesystemFlag
fsFlag SourceCredentials
creds =
SourceCredentials
-> (AsapoSourceCredentialsHandle
-> IO (Either Error AsapoConsumerHandle))
-> IO (Either Error AsapoConsumerHandle)
forall a.
SourceCredentials -> (AsapoSourceCredentialsHandle -> IO a) -> IO a
withCredentials SourceCredentials
creds \AsapoSourceCredentialsHandle
creds' ->
Text
-> (ConstCString -> IO (Either Error AsapoConsumerHandle))
-> IO (Either Error AsapoConsumerHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
serverName \ConstCString
serverNameC ->
Text
-> (ConstCString -> IO (Either Error AsapoConsumerHandle))
-> IO (Either Error AsapoConsumerHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
sourcePath \ConstCString
sourcePathC ->
(Ptr AsapoErrorHandle -> IO AsapoConsumerHandle)
-> IO (Either Error AsapoConsumerHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (ConstCString
-> ConstCString
-> AsapoBool
-> AsapoSourceCredentialsHandle
-> Ptr AsapoErrorHandle
-> IO AsapoConsumerHandle
asapo_create_consumer ConstCString
serverNameC ConstCString
sourcePathC (if FilesystemFlag
fsFlag FilesystemFlag -> FilesystemFlag -> Bool
forall a. Eq a => a -> a -> Bool
== FilesystemFlag
WithFilesystem then AsapoBool
1 else AsapoBool
0) AsapoSourceCredentialsHandle
creds')
withConsumer :: forall a. ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> (Error -> IO a) -> (Consumer -> IO a) -> IO a
withConsumer :: forall a.
ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Error -> IO a)
-> (Consumer -> IO a)
-> IO a
withConsumer ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds Error -> IO a
onError Consumer -> IO a
onSuccess = IO (Either Error AsapoConsumerHandle)
-> (Either Error AsapoConsumerHandle -> IO ())
-> (Either Error AsapoConsumerHandle -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> IO (Either Error AsapoConsumerHandle)
create ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds) Either Error AsapoConsumerHandle -> IO ()
freeConsumer Either Error AsapoConsumerHandle -> IO a
handle
where
freeConsumer :: Either Error AsapoConsumerHandle -> IO ()
freeConsumer :: Either Error AsapoConsumerHandle -> IO ()
freeConsumer (Right AsapoConsumerHandle
h) = AsapoConsumerHandle -> IO ()
asapo_free_consumer_handle AsapoConsumerHandle
h
freeConsumer Either Error AsapoConsumerHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handle :: Either Error AsapoConsumerHandle -> IO a
handle :: Either Error AsapoConsumerHandle -> IO a
handle (Left Error
e) = Error -> IO a
onError Error
e
handle (Right AsapoConsumerHandle
v) = Consumer -> IO a
onSuccess (AsapoConsumerHandle -> Consumer
Consumer AsapoConsumerHandle
v)
newtype GroupId = GroupId AsapoStringHandle
withGroupId :: forall a. Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
withGroupId :: forall a. Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
withGroupId (Consumer AsapoConsumerHandle
consumerHandle) Error -> IO a
onError GroupId -> IO a
onSuccess = IO (Either Error GroupId)
-> (Either Error GroupId -> IO ())
-> (Either Error GroupId -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error GroupId)
createGroupId Either Error GroupId -> IO ()
forall {a}. Either a GroupId -> IO ()
destroy Either Error GroupId -> IO a
handle
where
createGroupId :: IO (Either Error GroupId)
createGroupId = (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> (AsapoStringHandle -> IO (Either Error GroupId))
-> IO (Either Error GroupId)
forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess (AsapoConsumerHandle -> Ptr AsapoErrorHandle -> IO AsapoStringHandle
asapo_consumer_generate_new_group_id AsapoConsumerHandle
consumerHandle) (Either Error GroupId -> IO (Either Error GroupId)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Error GroupId -> IO (Either Error GroupId))
-> (AsapoStringHandle -> Either Error GroupId)
-> AsapoStringHandle
-> IO (Either Error GroupId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GroupId -> Either Error GroupId
forall a b. b -> Either a b
Right (GroupId -> Either Error GroupId)
-> (AsapoStringHandle -> GroupId)
-> AsapoStringHandle
-> Either Error GroupId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AsapoStringHandle -> GroupId
GroupId)
destroy :: Either a GroupId -> IO ()
destroy (Right (GroupId AsapoStringHandle
stringHandle)) = AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle
stringHandle
destroy Either a GroupId
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
handle :: Either Error GroupId -> IO a
handle (Right GroupId
v) = GroupId -> IO a
onSuccess GroupId
v
handle (Left Error
e) = Error -> IO a
onError Error
e
setTimeout :: Consumer -> NominalDiffTime -> IO ()
setTimeout :: Consumer -> NominalDiffTime -> IO ()
setTimeout (Consumer AsapoConsumerHandle
consumerHandle) NominalDiffTime
timeout = AsapoConsumerHandle -> Word64 -> IO ()
asapo_consumer_set_timeout AsapoConsumerHandle
consumerHandle (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout)
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO (Either Error Int)
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO (Either Error Int)
resetLastReadMarker (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_reset_last_read_marker AsapoConsumerHandle
consumer AsapoStringHandle
groupId ConstCString
streamNameC)
setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
setLastReadMarker :: Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
setLastReadMarker (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
value) =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_set_last_read_marker AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
value ConstCString
streamNameC)
acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
acknowledge :: Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
acknowledge (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
messageId) =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_acknowledge AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
messageId ConstCString
streamNameC)
negativeAcknowledge ::
Consumer ->
GroupId ->
StreamName ->
MessageId ->
NominalDiffTime ->
IO (Either Error Int)
negativeAcknowledge :: Consumer
-> GroupId
-> StreamName
-> MessageId
-> NominalDiffTime
-> IO (Either Error Int)
negativeAcknowledge (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
messageId) NominalDiffTime
delay =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_negative_acknowledge AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
messageId (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
delay) ConstCString
streamNameC)
getUnacknowledgedMessages :: Consumer -> GroupId -> StreamName -> (MessageId, MessageId) -> IO (Either Error [MessageId])
getUnacknowledgedMessages :: Consumer
-> GroupId
-> StreamName
-> (MessageId, MessageId)
-> IO (Either Error [MessageId])
getUnacknowledgedMessages (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
from, MessageId Word64
to) = IO (Either Error AsapoIdListHandle)
-> (Either Error AsapoIdListHandle -> IO ())
-> (Either Error AsapoIdListHandle
-> IO (Either Error [MessageId]))
-> IO (Either Error [MessageId])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoIdListHandle)
init Either Error AsapoIdListHandle -> IO ()
destroy Either Error AsapoIdListHandle -> IO (Either Error [MessageId])
handle
where
init :: IO (Either Error AsapoIdListHandle)
init :: IO (Either Error AsapoIdListHandle)
init = Text
-> (ConstCString -> IO (Either Error AsapoIdListHandle))
-> IO (Either Error AsapoIdListHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName ((ConstCString -> IO (Either Error AsapoIdListHandle))
-> IO (Either Error AsapoIdListHandle))
-> (ConstCString -> IO (Either Error AsapoIdListHandle))
-> IO (Either Error AsapoIdListHandle)
forall a b. (a -> b) -> a -> b
$ (Ptr AsapoErrorHandle -> IO AsapoIdListHandle)
-> IO (Either Error AsapoIdListHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError ((Ptr AsapoErrorHandle -> IO AsapoIdListHandle)
-> IO (Either Error AsapoIdListHandle))
-> (ConstCString -> Ptr AsapoErrorHandle -> IO AsapoIdListHandle)
-> ConstCString
-> IO (Either Error AsapoIdListHandle)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoIdListHandle
asapo_consumer_get_unacknowledged_messages AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
from Word64
to
destroy :: Either Error AsapoIdListHandle -> IO ()
destroy :: Either Error AsapoIdListHandle -> IO ()
destroy (Left Error
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
destroy (Right AsapoIdListHandle
handle') = AsapoIdListHandle -> IO ()
asapo_free_id_list_handle AsapoIdListHandle
handle'
handle :: Either Error AsapoIdListHandle -> IO (Either Error [MessageId])
handle :: Either Error AsapoIdListHandle -> IO (Either Error [MessageId])
handle (Left Error
e) = Either Error [MessageId] -> IO (Either Error [MessageId])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error [MessageId]
forall a b. a -> Either a b
Left Error
e)
handle (Right AsapoIdListHandle
idListHandle) = do
CSize
numberOfIds <- AsapoIdListHandle -> IO CSize
asapo_id_list_get_size AsapoIdListHandle
idListHandle
[MessageId] -> Either Error [MessageId]
forall a b. b -> Either a b
Right ([MessageId] -> Either Error [MessageId])
-> IO [MessageId] -> IO (Either Error [MessageId])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO MessageId) -> CSize -> IO [MessageId]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit ((Word64 -> MessageId
MessageId (Word64 -> MessageId) -> IO Word64 -> IO MessageId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (IO Word64 -> IO MessageId)
-> (CSize -> IO Word64) -> CSize -> IO MessageId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AsapoIdListHandle -> CSize -> IO Word64
asapo_id_list_get_item AsapoIdListHandle
idListHandle) CSize
numberOfIds
data NetworkConnectionType
=
ConnectionUndefined
|
ConnectionTcp
|
ConnectionFabric
convertConnectionType :: AsapoNetworkConnectionType -> NetworkConnectionType
convertConnectionType :: AsapoBool -> NetworkConnectionType
convertConnectionType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kUndefined = NetworkConnectionType
ConnectionUndefined
convertConnectionType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kAsapoTcp = NetworkConnectionType
ConnectionTcp
convertConnectionType AsapoBool
_ = NetworkConnectionType
ConnectionFabric
getCurrentConnectionType :: Consumer -> IO NetworkConnectionType
getCurrentConnectionType :: Consumer -> IO NetworkConnectionType
getCurrentConnectionType (Consumer AsapoConsumerHandle
consumerHandle) = AsapoBool -> NetworkConnectionType
convertConnectionType (AsapoBool -> NetworkConnectionType)
-> IO AsapoBool -> IO NetworkConnectionType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoConsumerHandle -> IO AsapoBool
asapo_consumer_current_connection_type AsapoConsumerHandle
consumerHandle
data StreamFilter = FilterAllStreams | FilterFinishedStreams | FilterUnfinishedStreams
convertStreamFilter :: StreamFilter -> AsapoStreamFilter
convertStreamFilter :: StreamFilter -> AsapoBool
convertStreamFilter StreamFilter
FilterAllStreams = AsapoBool
kAllStreams
convertStreamFilter StreamFilter
FilterFinishedStreams = AsapoBool
kFinishedStreams
convertStreamFilter StreamFilter
FilterUnfinishedStreams = AsapoBool
kUnfinishedStreams
repeatGetterWithSizeLimit :: (Eq a1, Num a1, Applicative f, Enum a1) => (a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit :: forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit a1 -> f a2
f a1
n
| a1
n a1 -> a1 -> Bool
forall a. Eq a => a -> a -> Bool
== a1
0 = [a2] -> f [a2]
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
| Bool
otherwise = (a1 -> f a2) -> [a1] -> f [a2]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse a1 -> f a2
f [a1
0 .. a1
n a1 -> a1 -> a1
forall a. Num a => a -> a -> a
- a1
1]
getStreamList ::
Consumer ->
Maybe StreamName ->
StreamFilter ->
IO (Either Error [StreamInfo])
getStreamList :: Consumer
-> Maybe StreamName
-> StreamFilter
-> IO (Either Error [StreamInfo])
getStreamList (Consumer AsapoConsumerHandle
consumer) Maybe StreamName
streamName StreamFilter
filter = IO (Either Error AsapoStreamInfosHandle)
-> (Either Error AsapoStreamInfosHandle -> IO ())
-> (Either Error AsapoStreamInfosHandle
-> IO (Either Error [StreamInfo]))
-> IO (Either Error [StreamInfo])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStreamInfosHandle)
init Either Error AsapoStreamInfosHandle -> IO ()
forall {a}. Either a AsapoStreamInfosHandle -> IO ()
destroy Either Error AsapoStreamInfosHandle
-> IO (Either Error [StreamInfo])
forall {a}.
Either a AsapoStreamInfosHandle -> IO (Either a [StreamInfo])
handle
where
init :: IO (Either Error AsapoStreamInfosHandle)
init =
let realStreamName :: Text
realStreamName = case Maybe StreamName
streamName of
Maybe StreamName
Nothing -> Text
""
Just (StreamName Text
streamName') -> Text
streamName'
in Text
-> (ConstCString -> IO (Either Error AsapoStreamInfosHandle))
-> IO (Either Error AsapoStreamInfosHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
realStreamName ((ConstCString -> IO (Either Error AsapoStreamInfosHandle))
-> IO (Either Error AsapoStreamInfosHandle))
-> (ConstCString -> IO (Either Error AsapoStreamInfosHandle))
-> IO (Either Error AsapoStreamInfosHandle)
forall a b. (a -> b) -> a -> b
$ \ConstCString
streamNameC -> (Ptr AsapoErrorHandle -> IO AsapoStreamInfosHandle)
-> IO (Either Error AsapoStreamInfosHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> ConstCString
-> AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoStreamInfosHandle
asapo_consumer_get_stream_list AsapoConsumerHandle
consumer ConstCString
streamNameC (StreamFilter -> AsapoBool
convertStreamFilter StreamFilter
filter))
destroy :: Either a AsapoStreamInfosHandle -> IO ()
destroy (Left a
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
destroy (Right AsapoStreamInfosHandle
handle') = AsapoStreamInfosHandle -> IO ()
asapo_free_stream_infos_handle AsapoStreamInfosHandle
handle'
handle :: Either a AsapoStreamInfosHandle -> IO (Either a [StreamInfo])
handle (Left a
e) = Either a [StreamInfo] -> IO (Either a [StreamInfo])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Either a [StreamInfo]
forall a b. a -> Either a b
Left a
e)
handle (Right AsapoStreamInfosHandle
streamInfosHandle) = do
CSize
numberOfStreams <- AsapoStreamInfosHandle -> IO CSize
asapo_stream_infos_get_size AsapoStreamInfosHandle
streamInfosHandle
[StreamInfo] -> Either a [StreamInfo]
forall a b. b -> Either a b
Right
([StreamInfo] -> Either a [StreamInfo])
-> IO [StreamInfo] -> IO (Either a [StreamInfo])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO StreamInfo) -> CSize -> IO [StreamInfo]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit
(AsapoStreamInfosHandle -> CSize -> IO AsapoStreamInfoHandle
asapo_stream_infos_get_item AsapoStreamInfosHandle
streamInfosHandle (CSize -> IO AsapoStreamInfoHandle)
-> (AsapoStreamInfoHandle -> IO StreamInfo)
-> CSize
-> IO StreamInfo
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoStreamInfoHandle -> IO StreamInfo
retrieveStreamInfoFromC)
CSize
numberOfStreams
data DeleteFlag = DeleteMeta | DontDeleteMeta deriving (DeleteFlag -> DeleteFlag -> Bool
(DeleteFlag -> DeleteFlag -> Bool)
-> (DeleteFlag -> DeleteFlag -> Bool) -> Eq DeleteFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeleteFlag -> DeleteFlag -> Bool
== :: DeleteFlag -> DeleteFlag -> Bool
$c/= :: DeleteFlag -> DeleteFlag -> Bool
/= :: DeleteFlag -> DeleteFlag -> Bool
Eq)
data ErrorOnNotExistFlag = ErrorOnNotExist | NoErrorOnNotExist deriving (ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
(ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool)
-> (ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool)
-> Eq ErrorOnNotExistFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
== :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
$c/= :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
/= :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
Eq)
deleteStream :: Consumer -> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO (Either Error Int)
deleteStream :: Consumer
-> StreamName
-> DeleteFlag
-> ErrorOnNotExistFlag
-> IO (Either Error Int)
deleteStream (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) DeleteFlag
deleteFlag ErrorOnNotExistFlag
errorOnNotExistFlag =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
(AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoConsumerHandle
-> ConstCString
-> AsapoBool
-> AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_delete_stream
AsapoConsumerHandle
consumer
ConstCString
streamNameC
(if DeleteFlag
deleteFlag DeleteFlag -> DeleteFlag -> Bool
forall a. Eq a => a -> a -> Bool
== DeleteFlag
DeleteMeta then AsapoBool
1 else AsapoBool
0)
(if ErrorOnNotExistFlag
errorOnNotExistFlag ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorOnNotExistFlag
ErrorOnNotExist then AsapoBool
1 else AsapoBool
0)
)
setStreamPersistent :: Consumer -> StreamName -> IO (Either Error Int)
setStreamPersistent :: Consumer -> StreamName -> IO (Either Error Int)
setStreamPersistent (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
(AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoConsumerHandle
-> ConstCString -> Ptr AsapoErrorHandle -> IO AsapoBool
asapo_consumer_set_stream_persistent
AsapoConsumerHandle
consumer
ConstCString
streamNameC
)
getCurrentSize :: Consumer -> StreamName -> IO (Either Error Int)
getCurrentSize :: Consumer -> StreamName -> IO (Either Error Int)
getCurrentSize (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
(Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Either Error Int64 -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(Either Error Int64 -> Either Error Int)
-> IO (Either Error Int64) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO Int64) -> IO (Either Error Int64)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoConsumerHandle
-> ConstCString -> Ptr AsapoErrorHandle -> IO Int64
asapo_consumer_get_current_size
AsapoConsumerHandle
consumer
ConstCString
streamNameC
)
data IncludeIncompleteFlag = IncludeIncomplete | ExcludeIncomplete deriving (IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
(IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool)
-> (IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool)
-> Eq IncludeIncompleteFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
== :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
$c/= :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
/= :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
Eq)
getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO (Either Error Int)
getCurrentDatasetCount :: Consumer
-> StreamName -> IncludeIncompleteFlag -> IO (Either Error Int)
getCurrentDatasetCount (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) IncludeIncompleteFlag
inludeIncomplete =
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
(Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Either Error Int64 -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(Either Error Int64 -> Either Error Int)
-> IO (Either Error Int64) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO Int64) -> IO (Either Error Int64)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoConsumerHandle
-> ConstCString -> AsapoBool -> Ptr AsapoErrorHandle -> IO Int64
asapo_consumer_get_current_dataset_count
AsapoConsumerHandle
consumer
ConstCString
streamNameC
(if IncludeIncompleteFlag
inludeIncomplete IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
forall a. Eq a => a -> a -> Bool
== IncludeIncompleteFlag
IncludeIncomplete then AsapoBool
1 else AsapoBool
0)
)
getBeamtimeMeta :: Consumer -> IO (Either Error (Maybe Text))
getBeamtimeMeta :: Consumer -> IO (Either Error (Maybe Text))
getBeamtimeMeta (Consumer AsapoConsumerHandle
consumer) = (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> IO (Either Error AsapoStringHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle -> Ptr AsapoErrorHandle -> IO AsapoStringHandle
asapo_consumer_get_beamtime_meta AsapoConsumerHandle
consumer) IO (Either Error AsapoStringHandle)
-> (Either Error AsapoStringHandle
-> IO (Either Error (Maybe Text)))
-> IO (Either Error (Maybe Text))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoStringHandle -> IO (Maybe Text))
-> Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoStringHandle -> IO (Maybe Text)
stringHandleToText
newtype MessageMetaHandle = MessageMetaHandle (ForeignPtr ()) deriving (Int -> MessageMetaHandle -> ShowS
[MessageMetaHandle] -> ShowS
MessageMetaHandle -> String
(Int -> MessageMetaHandle -> ShowS)
-> (MessageMetaHandle -> String)
-> ([MessageMetaHandle] -> ShowS)
-> Show MessageMetaHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageMetaHandle -> ShowS
showsPrec :: Int -> MessageMetaHandle -> ShowS
$cshow :: MessageMetaHandle -> String
show :: MessageMetaHandle -> String
$cshowList :: [MessageMetaHandle] -> ShowS
showList :: [MessageMetaHandle] -> ShowS
Show)
withMessageMetaHandle :: MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle :: forall a.
MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle (MessageMetaHandle ForeignPtr ()
foreignPtr) AsapoMessageMetaHandle -> IO a
f = ForeignPtr () -> (Ptr () -> IO a) -> IO a
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr ()
foreignPtr (AsapoMessageMetaHandle -> IO a
f (AsapoMessageMetaHandle -> IO a)
-> (Ptr () -> AsapoMessageMetaHandle) -> Ptr () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ptr () -> AsapoMessageMetaHandle
AsapoMessageMetaHandle)
newMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle (AsapoMessageMetaHandle Ptr ()
inputPtr) = do
ForeignPtr () -> MessageMetaHandle
MessageMetaHandle (ForeignPtr () -> MessageMetaHandle)
-> IO (ForeignPtr ()) -> IO MessageMetaHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FinalizerPtr () -> Ptr () -> IO (ForeignPtr ())
forall a. FinalizerPtr a -> Ptr a -> IO (ForeignPtr a)
newForeignPtr FinalizerPtr ()
p_asapo_free_handle Ptr ()
inputPtr
newtype MessageDataHandle = MessageDataHandle (ForeignPtr ()) deriving (Int -> MessageDataHandle -> ShowS
[MessageDataHandle] -> ShowS
MessageDataHandle -> String
(Int -> MessageDataHandle -> ShowS)
-> (MessageDataHandle -> String)
-> ([MessageDataHandle] -> ShowS)
-> Show MessageDataHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageDataHandle -> ShowS
showsPrec :: Int -> MessageDataHandle -> ShowS
$cshow :: MessageDataHandle -> String
show :: MessageDataHandle -> String
$cshowList :: [MessageDataHandle] -> ShowS
showList :: [MessageDataHandle] -> ShowS
Show)
withMessageDataHandle :: MessageDataHandle -> (AsapoMessageDataHandle -> IO a) -> IO a
withMessageDataHandle :: forall a.
MessageDataHandle -> (AsapoMessageDataHandle -> IO a) -> IO a
withMessageDataHandle (MessageDataHandle ForeignPtr ()
foreignPtr) AsapoMessageDataHandle -> IO a
f = ForeignPtr () -> (Ptr () -> IO a) -> IO a
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr ()
foreignPtr (AsapoMessageDataHandle -> IO a
f (AsapoMessageDataHandle -> IO a)
-> (Ptr () -> AsapoMessageDataHandle) -> Ptr () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ptr () -> AsapoMessageDataHandle
AsapoMessageDataHandle)
newMessageDataHandle :: AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle :: AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle (AsapoMessageDataHandle Ptr ()
inputPtr) =
ForeignPtr () -> MessageDataHandle
MessageDataHandle (ForeignPtr () -> MessageDataHandle)
-> IO (ForeignPtr ()) -> IO MessageDataHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FinalizerPtr () -> Ptr () -> IO (ForeignPtr ())
forall a. FinalizerPtr a -> Ptr a -> IO (ForeignPtr a)
newForeignPtr FinalizerPtr ()
p_asapo_free_handle Ptr ()
inputPtr
wrapMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle = AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle
data MessageMeta = MessageMeta
{ MessageMeta -> Text
messageMetaName :: Text,
MessageMeta -> UTCTime
messageMetaTimestamp :: UTCTime,
MessageMeta -> Word64
messageMetaSize :: Word64,
MessageMeta -> MessageId
messageMetaId :: MessageId,
MessageMeta -> Text
messageMetaSource :: Text,
MessageMeta -> Text
messageMetaMetadata :: Text,
MessageMeta -> Word64
messageMetaBufId :: Word64,
MessageMeta -> Word64
messageMetaDatasetSubstream :: Word64,
MessageMeta -> MessageMetaHandle
messageMetaHandle :: MessageMetaHandle
}
deriving (Int -> MessageMeta -> ShowS
[MessageMeta] -> ShowS
MessageMeta -> String
(Int -> MessageMeta -> ShowS)
-> (MessageMeta -> String)
-> ([MessageMeta] -> ShowS)
-> Show MessageMeta
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageMeta -> ShowS
showsPrec :: Int -> MessageMeta -> ShowS
$cshow :: MessageMeta -> String
show :: MessageMeta -> String
$cshowList :: [MessageMeta] -> ShowS
showList :: [MessageMeta] -> ShowS
Show)
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO (Either Error BS.ByteString)
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO (Either Error ByteString)
retrieveDataForMessageMeta Consumer
consumer MessageMeta
meta = Consumer -> MessageMetaHandle -> IO (Either Error ByteString)
retrieveDataFromMeta Consumer
consumer (MessageMeta -> MessageMetaHandle
messageMetaHandle MessageMeta
meta)
resolveMetadata :: MessageMetaHandle -> IO MessageMeta
resolveMetadata :: MessageMetaHandle -> IO MessageMeta
resolveMetadata MessageMetaHandle
metaHandle = MessageMetaHandle
-> (AsapoMessageMetaHandle -> IO MessageMeta) -> IO MessageMeta
forall a.
MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle MessageMetaHandle
metaHandle \AsapoMessageMetaHandle
meta -> do
(TimeSpec
timestamp, ()
_) <- TimeSpec -> (Ptr TimeSpec -> IO ()) -> IO (TimeSpec, ())
forall a b. Storable a => a -> (Ptr a -> IO b) -> IO (a, b)
withPtr TimeSpec
0 (AsapoMessageMetaHandle -> Ptr TimeSpec -> IO ()
asapo_message_meta_get_timestamp AsapoMessageMetaHandle
meta)
Text
-> UTCTime
-> Word64
-> MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta
MessageMeta
(Text
-> UTCTime
-> Word64
-> MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta)
-> IO Text
-> IO
(UTCTime
-> Word64
-> MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AsapoMessageMetaHandle -> IO ConstCString
asapo_message_meta_get_name AsapoMessageMetaHandle
meta IO ConstCString -> (ConstCString -> IO Text) -> IO Text
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConstCString -> IO Text
peekConstCStringText)
IO
(UTCTime
-> Word64
-> MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta)
-> IO UTCTime
-> IO
(Word64
-> MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> UTCTime -> IO UTCTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TimeSpec -> UTCTime
timespecToUTC TimeSpec
timestamp)
IO
(Word64
-> MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta)
-> IO Word64
-> IO
(MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_size AsapoMessageMetaHandle
meta
IO
(MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta)
-> IO MessageId
-> IO
(Text
-> Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Word64 -> MessageId
MessageId (Word64 -> MessageId) -> IO Word64 -> IO MessageId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_id AsapoMessageMetaHandle
meta)
IO
(Text
-> Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Text
-> IO
(Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (AsapoMessageMetaHandle -> IO ConstCString
asapo_message_meta_get_source AsapoMessageMetaHandle
meta IO ConstCString -> (ConstCString -> IO Text) -> IO Text
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConstCString -> IO Text
peekConstCStringText)
IO (Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Text
-> IO (Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (AsapoMessageMetaHandle -> IO ConstCString
asapo_message_meta_get_metadata AsapoMessageMetaHandle
meta IO ConstCString -> (ConstCString -> IO Text) -> IO Text
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConstCString -> IO Text
peekConstCStringText)
IO (Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Word64 -> IO (Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_buf_id AsapoMessageMetaHandle
meta
IO (Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Word64 -> IO (MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_dataset_substream AsapoMessageMetaHandle
meta
IO (MessageMetaHandle -> MessageMeta)
-> IO MessageMetaHandle -> IO MessageMeta
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MessageMetaHandle -> IO MessageMetaHandle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageMetaHandle
metaHandle
data Dataset = Dataset
{ Dataset -> Word64
datasetId :: Word64,
Dataset -> Word64
datasetExpectedSize :: Word64,
Dataset -> [MessageMetaHandle]
datasetItems :: [MessageMetaHandle]
}
retrieveDatasetFromC :: AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC :: AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC AsapoDataSetHandle
handle = do
CSize
numberOfItems <- AsapoDataSetHandle -> IO CSize
asapo_dataset_get_size AsapoDataSetHandle
handle
[MessageMetaHandle]
items <- (CSize -> IO MessageMetaHandle) -> CSize -> IO [MessageMetaHandle]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit (AsapoDataSetHandle -> CSize -> IO AsapoMessageMetaHandle
asapo_dataset_get_item AsapoDataSetHandle
handle (CSize -> IO AsapoMessageMetaHandle)
-> (AsapoMessageMetaHandle -> IO MessageMetaHandle)
-> CSize
-> IO MessageMetaHandle
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle) CSize
numberOfItems
Word64 -> Word64 -> [MessageMetaHandle] -> Dataset
Dataset (Word64 -> Word64 -> [MessageMetaHandle] -> Dataset)
-> IO Word64 -> IO (Word64 -> [MessageMetaHandle] -> Dataset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoDataSetHandle -> IO Word64
asapo_dataset_get_id AsapoDataSetHandle
handle IO (Word64 -> [MessageMetaHandle] -> Dataset)
-> IO Word64 -> IO ([MessageMetaHandle] -> Dataset)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoDataSetHandle -> IO Word64
asapo_dataset_get_expected_size AsapoDataSetHandle
handle IO ([MessageMetaHandle] -> Dataset)
-> IO [MessageMetaHandle] -> IO Dataset
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [MessageMetaHandle] -> IO [MessageMetaHandle]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [MessageMetaHandle]
items
getNextDataset ::
Consumer ->
GroupId ->
Word64 ->
StreamName ->
IO (Either Error Dataset)
getNextDataset :: Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
getNextDataset (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) Word64
minSize (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> do
(Ptr AsapoErrorHandle -> IO AsapoDataSetHandle)
-> IO (Either Error AsapoDataSetHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoDataSetHandle
asapo_consumer_get_next_dataset AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
minSize ConstCString
streamNameC) IO (Either Error AsapoDataSetHandle)
-> (Either Error AsapoDataSetHandle -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoDataSetHandle -> IO Dataset)
-> Either Error AsapoDataSetHandle -> IO (Either Error Dataset)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC
getLastDataset ::
Consumer ->
Word64 ->
StreamName ->
IO (Either Error Dataset)
getLastDataset :: Consumer -> Word64 -> StreamName -> IO (Either Error Dataset)
getLastDataset (Consumer AsapoConsumerHandle
consumer) Word64
minSize (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> do
(Ptr AsapoErrorHandle -> IO AsapoDataSetHandle)
-> IO (Either Error AsapoDataSetHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoDataSetHandle
asapo_consumer_get_last_dataset AsapoConsumerHandle
consumer Word64
minSize ConstCString
streamNameC) IO (Either Error AsapoDataSetHandle)
-> (Either Error AsapoDataSetHandle -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoDataSetHandle -> IO Dataset)
-> Either Error AsapoDataSetHandle -> IO (Either Error Dataset)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC
getLastDatasetInGroup ::
Consumer ->
GroupId ->
Word64 ->
StreamName ->
IO (Either Error Dataset)
getLastDatasetInGroup :: Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
getLastDatasetInGroup (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) Word64
minSize (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> do
(Ptr AsapoErrorHandle -> IO AsapoDataSetHandle)
-> IO (Either Error AsapoDataSetHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoDataSetHandle
asapo_consumer_get_last_dataset_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
minSize ConstCString
streamNameC) IO (Either Error AsapoDataSetHandle)
-> (Either Error AsapoDataSetHandle -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoDataSetHandle -> IO Dataset)
-> Either Error AsapoDataSetHandle -> IO (Either Error Dataset)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC
retrieveDataFromHandle :: MessageDataHandle -> IO BS.ByteString
retrieveDataFromHandle :: MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle = do
MessageDataHandle
-> (AsapoMessageDataHandle -> IO ByteString) -> IO ByteString
forall a.
MessageDataHandle -> (AsapoMessageDataHandle -> IO a) -> IO a
withMessageDataHandle MessageDataHandle
dataHandle \AsapoMessageDataHandle
dataHandlePtr -> do
ConstCString
messageCString <- AsapoMessageDataHandle -> IO ConstCString
asapo_message_data_get_as_chars AsapoMessageDataHandle
dataHandlePtr
CString -> IO ByteString
BS.packCString (ConstCString -> CString
forall a. ConstPtr a -> Ptr a
unConstPtr ConstCString
messageCString)
retrieveDataFromMeta :: Consumer -> MessageMetaHandle -> IO (Either Error BS.ByteString)
retrieveDataFromMeta :: Consumer -> MessageMetaHandle -> IO (Either Error ByteString)
retrieveDataFromMeta (Consumer AsapoConsumerHandle
consumer) MessageMetaHandle
metaHandle =
MessageMetaHandle
-> (AsapoMessageMetaHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle MessageMetaHandle
metaHandle \AsapoMessageMetaHandle
metaHandlePtr ->
(Ptr AsapoMessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoMessageDataHandle
dataHandlePtrPtr ->
(Ptr AsapoErrorHandle -> IO AsapoBool)
-> (AsapoBool -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess (AsapoConsumerHandle
-> AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_retrieve_data AsapoConsumerHandle
consumer AsapoMessageMetaHandle
metaHandlePtr Ptr AsapoMessageDataHandle
dataHandlePtrPtr) \AsapoBool
_result -> do
AsapoMessageDataHandle
dataHandlePtr <- Ptr AsapoMessageDataHandle -> IO AsapoMessageDataHandle
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoMessageDataHandle
dataHandlePtrPtr
MessageDataHandle
dataHandle <- AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle AsapoMessageDataHandle
dataHandlePtr
ByteString -> Either Error ByteString
forall a b. b -> Either a b
Right (ByteString -> Either Error ByteString)
-> IO ByteString -> IO (Either Error ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle
withMessageHandles ::
StreamName ->
(Ptr AsapoMessageMetaHandle -> Ptr AsapoMessageDataHandle -> ConstCString -> Ptr AsapoErrorHandle -> IO CInt) ->
(MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)) ->
IO (Either Error a)
withMessageHandles :: forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles (StreamName Text
streamName) Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
g MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)
f = do
(Ptr AsapoMessageMetaHandle -> IO (Either Error a))
-> IO (Either Error a)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoMessageMetaHandle
metaHandlePtrPtr ->
(Ptr AsapoMessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoMessageDataHandle
dataHandlePtrPtr ->
Text
-> (ConstCString -> IO (Either Error a)) -> IO (Either Error a)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
(Ptr AsapoErrorHandle -> IO AsapoBool)
-> (AsapoBool -> IO (Either Error a)) -> IO (Either Error a)
forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
g Ptr AsapoMessageMetaHandle
metaHandlePtrPtr Ptr AsapoMessageDataHandle
dataHandlePtrPtr ConstCString
streamNameC) \AsapoBool
_result -> do
AsapoMessageMetaHandle
metaHandlePtr <- Ptr AsapoMessageMetaHandle -> IO AsapoMessageMetaHandle
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoMessageMetaHandle
metaHandlePtrPtr
AsapoMessageDataHandle
dataHandlePtr <- Ptr AsapoMessageDataHandle -> IO AsapoMessageDataHandle
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoMessageDataHandle
dataHandlePtrPtr
MessageMetaHandle
metaHandle <- AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle AsapoMessageMetaHandle
metaHandlePtr
MessageDataHandle
dataHandle <- AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle AsapoMessageDataHandle
dataHandlePtr
MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)
f MessageMetaHandle
metaHandle MessageDataHandle
dataHandle
withMessageHandlesById :: Consumer -> StreamName -> MessageId -> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)) -> IO (Either Error a)
withMessageHandlesById :: forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (MessageId Word64
messageId) =
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles
StreamName
streamName
(AsapoConsumerHandle
-> Word64
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_by_id AsapoConsumerHandle
consumer Word64
messageId)
retrieveMessageMetaAndData :: MessageMetaHandle -> MessageDataHandle -> IO (Either a (MessageMeta, BS.ByteString))
retrieveMessageMetaAndData :: forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData MessageMetaHandle
metaHandle MessageDataHandle
dataHandle = do
ByteString
data' <- MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle
MessageMeta
meta <- MessageMetaHandle -> IO MessageMeta
resolveMetadata MessageMetaHandle
metaHandle
Either a (MessageMeta, ByteString)
-> IO (Either a (MessageMeta, ByteString))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((MessageMeta, ByteString) -> Either a (MessageMeta, ByteString)
forall a b. b -> Either a b
Right (MessageMeta
meta, ByteString
data'))
retrieveMessageMeta :: MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta :: forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta MessageMetaHandle
metaHandle p
_dataHandle = do
MessageMeta
meta <- MessageMetaHandle -> IO MessageMeta
resolveMetadata MessageMetaHandle
metaHandle
Either a MessageMeta -> IO (Either a MessageMeta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageMeta -> Either a MessageMeta
forall a b. b -> Either a b
Right MessageMeta
meta)
retrieveMessageData :: p -> MessageDataHandle -> IO (Either a BS.ByteString)
retrieveMessageData :: forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData p
_metaHandle MessageDataHandle
dataHandle = do
ByteString
data' <- MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle
Either a ByteString -> IO (Either a ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Either a ByteString
forall a b. b -> Either a b
Right ByteString
data')
getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (Either Error (MessageMeta, BS.ByteString))
getMessageMetaAndDataById :: Consumer
-> StreamName
-> MessageId
-> IO (Either Error (MessageMeta, ByteString))
getMessageMetaAndDataById Consumer
consumer StreamName
streamName MessageId
messageId =
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle
-> MessageDataHandle
-> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById Consumer
consumer StreamName
streamName MessageId
messageId MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData
getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO (Either Error MessageMeta)
getMessageMetaById :: Consumer
-> StreamName -> MessageId -> IO (Either Error MessageMeta)
getMessageMetaById Consumer
consumer StreamName
streamName MessageId
messageId = do
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById Consumer
consumer StreamName
streamName MessageId
messageId MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO (Either Error BS.ByteString)
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO (Either Error ByteString)
getMessageDataById Consumer
consumer StreamName
streamName MessageId
messageId = do
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById Consumer
consumer StreamName
streamName MessageId
messageId MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData
getLastMessageMetaAndData :: Consumer -> StreamName -> IO (Either Error (MessageMeta, BS.ByteString))
getLastMessageMetaAndData :: Consumer
-> StreamName -> IO (Either Error (MessageMeta, ByteString))
getLastMessageMetaAndData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle
-> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last AsapoConsumerHandle
consumer) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData
getLastMessageMeta :: Consumer -> StreamName -> IO (Either Error MessageMeta)
getLastMessageMeta :: Consumer -> StreamName -> IO (Either Error MessageMeta)
getLastMessageMeta (Consumer AsapoConsumerHandle
consumer) StreamName
streamName = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last AsapoConsumerHandle
consumer) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta
getLastMessageData :: Consumer -> StreamName -> IO (Either Error BS.ByteString)
getLastMessageData :: Consumer -> StreamName -> IO (Either Error ByteString)
getLastMessageData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last AsapoConsumerHandle
consumer) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData
getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (Either Error (MessageMeta, BS.ByteString))
getLastInGroupMessageMetaAndData :: Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
getLastInGroupMessageMetaAndData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle
-> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getLastInGroupMessageMeta (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error BS.ByteString)
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
getLastInGroupMessageData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData
getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (Either Error (MessageMeta, BS.ByteString))
getNextMessageMetaAndData :: Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
getNextMessageMetaAndData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) =
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle
-> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles
StreamName
streamName
(AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_next AsapoConsumerHandle
consumer AsapoStringHandle
groupId)
MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getNextMessageMeta (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_next AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error BS.ByteString)
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
getNextMessageData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_next AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData
queryMessagesHandles ::
Consumer ->
Text ->
StreamName ->
IO (Either Error [MessageMetaHandle])
queryMessagesHandles :: Consumer
-> Text -> StreamName -> IO (Either Error [MessageMetaHandle])
queryMessagesHandles (Consumer AsapoConsumerHandle
consumer) Text
query (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error [MessageMetaHandle]))
-> IO (Either Error [MessageMetaHandle])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> Text
-> (ConstCString -> IO (Either Error [MessageMetaHandle]))
-> IO (Either Error [MessageMetaHandle])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
query \ConstCString
queryC ->
let init :: IO (Either Error AsapoMessageMetasHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoMessageMetasHandle)
-> IO (Either Error AsapoMessageMetasHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> ConstCString
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoMessageMetasHandle
asapo_consumer_query_messages AsapoConsumerHandle
consumer ConstCString
queryC ConstCString
streamNameC)
destroy :: Either a AsapoMessageMetasHandle -> IO ()
destroy (Left a
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
destroy (Right AsapoMessageMetasHandle
v) = AsapoMessageMetasHandle -> IO ()
asapo_free_message_metas_handle AsapoMessageMetasHandle
v
in IO (Either Error AsapoMessageMetasHandle)
-> (Either Error AsapoMessageMetasHandle -> IO ())
-> (Either Error AsapoMessageMetasHandle
-> IO (Either Error [MessageMetaHandle]))
-> IO (Either Error [MessageMetaHandle])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoMessageMetasHandle)
init Either Error AsapoMessageMetasHandle -> IO ()
forall {a}. Either a AsapoMessageMetasHandle -> IO ()
destroy \case
Left Error
e -> Either Error [MessageMetaHandle]
-> IO (Either Error [MessageMetaHandle])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error [MessageMetaHandle]
forall a b. a -> Either a b
Left Error
e)
Right AsapoMessageMetasHandle
metasHandle' -> do
CSize
numberOfMetas <- AsapoMessageMetasHandle -> IO CSize
asapo_message_metas_get_size AsapoMessageMetasHandle
metasHandle'
[MessageMetaHandle] -> Either Error [MessageMetaHandle]
forall a b. b -> Either a b
Right ([MessageMetaHandle] -> Either Error [MessageMetaHandle])
-> IO [MessageMetaHandle] -> IO (Either Error [MessageMetaHandle])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO MessageMetaHandle) -> CSize -> IO [MessageMetaHandle]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit (AsapoMessageMetasHandle -> CSize -> IO AsapoMessageMetaHandle
asapo_message_metas_get_item AsapoMessageMetasHandle
metasHandle' (CSize -> IO AsapoMessageMetaHandle)
-> (AsapoMessageMetaHandle -> IO MessageMetaHandle)
-> CSize
-> IO MessageMetaHandle
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle) CSize
numberOfMetas
queryMessages ::
Consumer ->
Text ->
StreamName ->
IO (Either Error [MessageMeta])
queryMessages :: Consumer -> Text -> StreamName -> IO (Either Error [MessageMeta])
queryMessages (Consumer AsapoConsumerHandle
consumer) Text
query (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error [MessageMeta]))
-> IO (Either Error [MessageMeta])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> Text
-> (ConstCString -> IO (Either Error [MessageMeta]))
-> IO (Either Error [MessageMeta])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
query \ConstCString
queryC ->
let init :: IO (Either Error AsapoMessageMetasHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoMessageMetasHandle)
-> IO (Either Error AsapoMessageMetasHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> ConstCString
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoMessageMetasHandle
asapo_consumer_query_messages AsapoConsumerHandle
consumer ConstCString
queryC ConstCString
streamNameC)
destroy :: Either a AsapoMessageMetasHandle -> IO ()
destroy (Left a
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
destroy (Right AsapoMessageMetasHandle
v) = AsapoMessageMetasHandle -> IO ()
asapo_free_message_metas_handle AsapoMessageMetasHandle
v
in IO (Either Error AsapoMessageMetasHandle)
-> (Either Error AsapoMessageMetasHandle -> IO ())
-> (Either Error AsapoMessageMetasHandle
-> IO (Either Error [MessageMeta]))
-> IO (Either Error [MessageMeta])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoMessageMetasHandle)
init Either Error AsapoMessageMetasHandle -> IO ()
forall {a}. Either a AsapoMessageMetasHandle -> IO ()
destroy \case
Left Error
e -> Either Error [MessageMeta] -> IO (Either Error [MessageMeta])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error [MessageMeta]
forall a b. a -> Either a b
Left Error
e)
Right AsapoMessageMetasHandle
metasHandle' -> do
CSize
numberOfMetas <- AsapoMessageMetasHandle -> IO CSize
asapo_message_metas_get_size AsapoMessageMetasHandle
metasHandle'
[MessageMeta] -> Either Error [MessageMeta]
forall a b. b -> Either a b
Right ([MessageMeta] -> Either Error [MessageMeta])
-> IO [MessageMeta] -> IO (Either Error [MessageMeta])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO MessageMeta) -> CSize -> IO [MessageMeta]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit (AsapoMessageMetasHandle -> CSize -> IO AsapoMessageMetaHandle
asapo_message_metas_get_item AsapoMessageMetasHandle
metasHandle' (CSize -> IO AsapoMessageMetaHandle)
-> (AsapoMessageMetaHandle -> IO MessageMeta)
-> CSize
-> IO MessageMeta
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle (AsapoMessageMetaHandle -> IO MessageMetaHandle)
-> (MessageMetaHandle -> IO MessageMeta)
-> AsapoMessageMetaHandle
-> IO MessageMeta
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> MessageMetaHandle -> IO MessageMeta
resolveMetadata) CSize
numberOfMetas
resendNacs ::
Consumer ->
Bool ->
NominalDiffTime ->
Word64 ->
IO ()
resendNacs :: Consumer -> Bool -> NominalDiffTime -> Word64 -> IO ()
resendNacs (Consumer AsapoConsumerHandle
consumer) Bool
resend NominalDiffTime
delay = AsapoConsumerHandle -> AsapoBool -> Word64 -> Word64 -> IO ()
asapo_consumer_set_resend_nacs AsapoConsumerHandle
consumer (if Bool
resend then AsapoBool
1 else AsapoBool
0) (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
delay)