{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- |
-- Description : High-level interface for all consumer-related functions
--
-- To implement an ASAP:O consumer, you should only need this interface.
-- It exposes no memory-management functions (like free) or pointers, and
-- is thus safe to use.
module Asapo.Either.Consumer
  ( FilesystemFlag (..),
    ServerName (..),
    IncludeIncompleteFlag (..),
    SourcePath (..),
    Dataset (..),
    MessageMeta (..),
    DeleteFlag (..),
    ErrorOnNotExistFlag (..),
    Consumer,
    NetworkConnectionType (..),
    StreamFilter (..),
    MessageMetaHandle,
    GroupId,
    Error (..),
    ErrorType (..),
    withConsumer,
    retrieveDataForMessageMeta,
    queryMessages,
    resendNacs,
    getNextDataset,
    retrieveDataFromMeta,
    setStreamPersistent,
    getLastMessageMetaAndData,
    getLastMessageMeta,
    getLastMessageData,
    getLastInGroupMessageMetaAndData,
    getLastInGroupMessageMeta,
    getLastInGroupMessageData,
    getNextMessageMetaAndData,
    getNextMessageMeta,
    getNextMessageData,
    getCurrentDatasetCount,
    getBeamtimeMeta,
    getCurrentSize,
    acknowledge,
    negativeAcknowledge,
    getMessageMetaAndDataById,
    getMessageMetaById,
    getMessageDataById,
    getUnacknowledgedMessages,
    withGroupId,
    queryMessagesHandles,
    setTimeout,
    getLastDataset,
    getLastDatasetInGroup,
    resetLastReadMarker,
    setLastReadMarker,
    getCurrentConnectionType,
    getStreamList,
    deleteStream,
    resolveMetadata,
  )
where

import Asapo.Either.Common (MessageId (MessageId), SourceCredentials, StreamInfo, StreamName (StreamName), nominalDiffToMillis, peekConstCStringText, retrieveStreamInfoFromC, stringHandleToText, timespecToUTC, withCStringNToText, withConstText, withCredentials, withPtr)
import Asapo.Raw.Common (AsapoErrorHandle, AsapoMessageDataHandle (AsapoMessageDataHandle), AsapoStringHandle, ConstCString, asapo_error_explain, asapo_free_error_handle, asapo_free_stream_infos_handle, asapo_free_string_handle, asapo_is_error, asapo_new_error_handle, asapo_stream_infos_get_item)
import Asapo.Raw.Consumer (AsapoConsumerErrorType, AsapoConsumerHandle, AsapoDataSetHandle, AsapoIdListHandle, AsapoMessageMetaHandle (AsapoMessageMetaHandle), AsapoNetworkConnectionType, AsapoStreamFilter, asapo_consumer_acknowledge, asapo_consumer_current_connection_type, asapo_consumer_delete_stream, asapo_consumer_generate_new_group_id, asapo_consumer_get_beamtime_meta, asapo_consumer_get_by_id, asapo_consumer_get_current_dataset_count, asapo_consumer_get_current_size, asapo_consumer_get_last, asapo_consumer_get_last_dataset, asapo_consumer_get_last_dataset_ingroup, asapo_consumer_get_last_ingroup, asapo_consumer_get_next, asapo_consumer_get_next_dataset, asapo_consumer_get_stream_list, asapo_consumer_get_unacknowledged_messages, asapo_consumer_negative_acknowledge, asapo_consumer_query_messages, asapo_consumer_reset_last_read_marker, asapo_consumer_retrieve_data, asapo_consumer_set_last_read_marker, asapo_consumer_set_resend_nacs, asapo_consumer_set_stream_persistent, asapo_consumer_set_timeout, asapo_create_consumer, asapo_dataset_get_expected_size, asapo_dataset_get_id, asapo_dataset_get_item, asapo_dataset_get_size, asapo_error_get_type, asapo_free_consumer_handle, asapo_free_id_list_handle, asapo_free_message_metas_handle, asapo_id_list_get_item, asapo_id_list_get_size, asapo_message_data_get_as_chars, asapo_message_meta_get_buf_id, asapo_message_meta_get_dataset_substream, asapo_message_meta_get_id, asapo_message_meta_get_metadata, asapo_message_meta_get_name, asapo_message_meta_get_size, asapo_message_meta_get_source, asapo_message_meta_get_timestamp, asapo_message_metas_get_item, asapo_message_metas_get_size, asapo_stream_infos_get_size, kAllStreams, kAsapoTcp, kDataNotInCache, kEndOfStream, kFinishedStreams, kInterruptedTransaction, kLocalIOError, kNoData, kPartialData, kStreamFinished, kUnavailableService, kUndefined, kUnfinishedStreams, kUnsupportedClient, kWrongInput)
import Asapo.Raw.FreeHandleHack (p_asapo_free_handle)
import Control.Applicative (Applicative ((<*>)), pure)
import Control.Exception (bracket)
import Control.Monad (Monad ((>>=)), (>=>))
import Data.Bool (Bool, otherwise)
import qualified Data.ByteString as BS
import Data.Either (Either (Left, Right))
import Data.Eq (Eq ((==)))
import Data.Function (($), (.))
import Data.Functor ((<$>))
import Data.Int (Int)
import Data.Maybe (Maybe (Just, Nothing))
import Data.Ord ((>))
import Data.Text (Text)
import Data.Time (NominalDiffTime, UTCTime)
import Data.Traversable (Traversable (traverse))
import Data.Word (Word64)
import Foreign (Storable (peek), alloca)
import Foreign.C (CInt)
import Foreign.C.ConstPtr (ConstPtr (unConstPtr))
import Foreign.ForeignPtr (ForeignPtr, newForeignPtr, withForeignPtr)
import Foreign.Ptr (Ptr)
import System.IO (IO)
import Text.Show (Show)
import Prelude (Enum, Num ((-)), fromIntegral)

-- | Wrapper for a server name (something like "host:port")
newtype ServerName = ServerName Text

-- | Wrapper for a source path (dubious to not use @FilePath@, but let's see)
newtype SourcePath = SourcePath Text

-- | Whether to use the filesystem or do it in-memory
data FilesystemFlag = WithFilesystem | WithoutFilesystem deriving (FilesystemFlag -> FilesystemFlag -> Bool
(FilesystemFlag -> FilesystemFlag -> Bool)
-> (FilesystemFlag -> FilesystemFlag -> Bool) -> Eq FilesystemFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FilesystemFlag -> FilesystemFlag -> Bool
== :: FilesystemFlag -> FilesystemFlag -> Bool
$c/= :: FilesystemFlag -> FilesystemFlag -> Bool
/= :: FilesystemFlag -> FilesystemFlag -> Bool
Eq)

-- | Wrapper around a consumer handle. Create with the @withConsumer@ function(s).
newtype Consumer = Consumer AsapoConsumerHandle

data ErrorType
  = ErrorNoData
  | ErrorEndOfStream
  | ErrorStreamFinished
  | ErrorUnavailableService
  | ErrorInterruptedTransaction
  | ErrorLocalIOError
  | ErrorWrongInput
  | ErrorPartialData
  | ErrorUnsupportedClient
  | ErrorDataNotInCache
  | ErrorUnknownError
  deriving (Int -> ErrorType -> ShowS
[ErrorType] -> ShowS
ErrorType -> String
(Int -> ErrorType -> ShowS)
-> (ErrorType -> String)
-> ([ErrorType] -> ShowS)
-> Show ErrorType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ErrorType -> ShowS
showsPrec :: Int -> ErrorType -> ShowS
$cshow :: ErrorType -> String
show :: ErrorType -> String
$cshowList :: [ErrorType] -> ShowS
showList :: [ErrorType] -> ShowS
Show)

convertErrorType :: AsapoConsumerErrorType -> ErrorType
convertErrorType :: AsapoBool -> ErrorType
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kNoData = ErrorType
ErrorNoData
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kEndOfStream = ErrorType
ErrorEndOfStream
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kStreamFinished = ErrorType
ErrorStreamFinished
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kUnavailableService = ErrorType
ErrorUnavailableService
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kInterruptedTransaction = ErrorType
ErrorInterruptedTransaction
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kLocalIOError = ErrorType
ErrorLocalIOError
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kWrongInput = ErrorType
ErrorWrongInput
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kPartialData = ErrorType
ErrorPartialData
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kUnsupportedClient = ErrorType
ErrorUnsupportedClient
convertErrorType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kDataNotInCache = ErrorType
ErrorDataNotInCache
convertErrorType AsapoBool
_ = ErrorType
ErrorUnknownError

-- | Wrapper around an ASAP:O producer error, with an additional error code
data Error = Error
  { Error -> Text
errorMessage :: Text,
    Error -> ErrorType
errorType :: ErrorType
  }
  deriving (Int -> Error -> ShowS
[Error] -> ShowS
Error -> String
(Int -> Error -> ShowS)
-> (Error -> String) -> ([Error] -> ShowS) -> Show Error
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Error -> ShowS
showsPrec :: Int -> Error -> ShowS
$cshow :: Error -> String
show :: Error -> String
$cshowList :: [Error] -> ShowS
showList :: [Error] -> ShowS
Show)

checkErrorWithGivenHandle :: AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle :: forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandle b
result = do
  AsapoBool
isError <- AsapoErrorHandle -> IO AsapoBool
asapo_is_error AsapoErrorHandle
errorHandle
  if AsapoBool
isError AsapoBool -> AsapoBool -> Bool
forall a. Ord a => a -> a -> Bool
> AsapoBool
0
    then do
      let explanationLength :: Int
explanationLength = Int
1024
      Text
explanation <- Int -> (CString -> IO ()) -> IO Text
withCStringNToText Int
explanationLength \CString
explanationPtr ->
        AsapoErrorHandle -> CString -> CSize -> IO ()
asapo_error_explain
          AsapoErrorHandle
errorHandle
          CString
explanationPtr
          (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
explanationLength)
      AsapoBool
errorType' <- AsapoErrorHandle -> IO AsapoBool
asapo_error_get_type AsapoErrorHandle
errorHandle
      Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error b
forall a b. a -> Either a b
Left (Text -> ErrorType -> Error
Error Text
explanation (AsapoBool -> ErrorType
convertErrorType AsapoBool
errorType')))
    else Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Either Error b
forall a b. b -> Either a b
Right b
result)

withErrorHandle :: (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle :: forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle = IO AsapoErrorHandle
-> (AsapoErrorHandle -> IO ())
-> (AsapoErrorHandle -> IO c)
-> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoErrorHandle
asapo_new_error_handle AsapoErrorHandle -> IO ()
asapo_free_error_handle

checkError :: (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError :: forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError Ptr AsapoErrorHandle -> IO b
f = do
  (AsapoErrorHandle -> IO (Either Error b)) -> IO (Either Error b)
forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle \AsapoErrorHandle
errorHandle -> do
    (AsapoErrorHandle
errorHandlePtr, b
result) <- AsapoErrorHandle
-> (Ptr AsapoErrorHandle -> IO b) -> IO (AsapoErrorHandle, b)
forall a b. Storable a => a -> (Ptr a -> IO b) -> IO (a, b)
withPtr AsapoErrorHandle
errorHandle Ptr AsapoErrorHandle -> IO b
f
    AsapoErrorHandle -> b -> IO (Either Error b)
forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandlePtr b
result

withSuccess :: (Ptr AsapoErrorHandle -> IO t) -> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess :: forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess Ptr AsapoErrorHandle -> IO t
toCheck t -> IO (Either Error b)
onSuccess = do
  Either Error t
result <- (Ptr AsapoErrorHandle -> IO t) -> IO (Either Error t)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError Ptr AsapoErrorHandle -> IO t
toCheck
  case Either Error t
result of
    Left Error
e -> Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error b
forall a b. a -> Either a b
Left Error
e)
    Right t
success -> t -> IO (Either Error b)
onSuccess t
success

create :: ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> IO (Either Error AsapoConsumerHandle)
create :: ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> IO (Either Error AsapoConsumerHandle)
create (ServerName Text
serverName) (SourcePath Text
sourcePath) FilesystemFlag
fsFlag SourceCredentials
creds =
  SourceCredentials
-> (AsapoSourceCredentialsHandle
    -> IO (Either Error AsapoConsumerHandle))
-> IO (Either Error AsapoConsumerHandle)
forall a.
SourceCredentials -> (AsapoSourceCredentialsHandle -> IO a) -> IO a
withCredentials SourceCredentials
creds \AsapoSourceCredentialsHandle
creds' ->
    Text
-> (ConstCString -> IO (Either Error AsapoConsumerHandle))
-> IO (Either Error AsapoConsumerHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
serverName \ConstCString
serverNameC ->
      Text
-> (ConstCString -> IO (Either Error AsapoConsumerHandle))
-> IO (Either Error AsapoConsumerHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
sourcePath \ConstCString
sourcePathC ->
        (Ptr AsapoErrorHandle -> IO AsapoConsumerHandle)
-> IO (Either Error AsapoConsumerHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (ConstCString
-> ConstCString
-> AsapoBool
-> AsapoSourceCredentialsHandle
-> Ptr AsapoErrorHandle
-> IO AsapoConsumerHandle
asapo_create_consumer ConstCString
serverNameC ConstCString
sourcePathC (if FilesystemFlag
fsFlag FilesystemFlag -> FilesystemFlag -> Bool
forall a. Eq a => a -> a -> Bool
== FilesystemFlag
WithFilesystem then AsapoBool
1 else AsapoBool
0) AsapoSourceCredentialsHandle
creds')

-- | Create a consumer and do something with it. This is the main entrypoint into the consumer
withConsumer :: forall a. ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> (Error -> IO a) -> (Consumer -> IO a) -> IO a
withConsumer :: forall a.
ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Error -> IO a)
-> (Consumer -> IO a)
-> IO a
withConsumer ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds Error -> IO a
onError Consumer -> IO a
onSuccess = IO (Either Error AsapoConsumerHandle)
-> (Either Error AsapoConsumerHandle -> IO ())
-> (Either Error AsapoConsumerHandle -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> IO (Either Error AsapoConsumerHandle)
create ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds) Either Error AsapoConsumerHandle -> IO ()
freeConsumer Either Error AsapoConsumerHandle -> IO a
handle
  where
    freeConsumer :: Either Error AsapoConsumerHandle -> IO ()
    freeConsumer :: Either Error AsapoConsumerHandle -> IO ()
freeConsumer (Right AsapoConsumerHandle
h) = AsapoConsumerHandle -> IO ()
asapo_free_consumer_handle AsapoConsumerHandle
h
    freeConsumer Either Error AsapoConsumerHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    handle :: Either Error AsapoConsumerHandle -> IO a
    handle :: Either Error AsapoConsumerHandle -> IO a
handle (Left Error
e) = Error -> IO a
onError Error
e
    handle (Right AsapoConsumerHandle
v) = Consumer -> IO a
onSuccess (AsapoConsumerHandle -> Consumer
Consumer AsapoConsumerHandle
v)

-- | Wrapper around a group ID
newtype GroupId = GroupId AsapoStringHandle

-- | Allocate a group ID and call a callback
withGroupId :: forall a. Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
withGroupId :: forall a. Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
withGroupId (Consumer AsapoConsumerHandle
consumerHandle) Error -> IO a
onError GroupId -> IO a
onSuccess = IO (Either Error GroupId)
-> (Either Error GroupId -> IO ())
-> (Either Error GroupId -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error GroupId)
createGroupId Either Error GroupId -> IO ()
forall {a}. Either a GroupId -> IO ()
destroy Either Error GroupId -> IO a
handle
  where
    createGroupId :: IO (Either Error GroupId)
createGroupId = (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> (AsapoStringHandle -> IO (Either Error GroupId))
-> IO (Either Error GroupId)
forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess (AsapoConsumerHandle -> Ptr AsapoErrorHandle -> IO AsapoStringHandle
asapo_consumer_generate_new_group_id AsapoConsumerHandle
consumerHandle) (Either Error GroupId -> IO (Either Error GroupId)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Error GroupId -> IO (Either Error GroupId))
-> (AsapoStringHandle -> Either Error GroupId)
-> AsapoStringHandle
-> IO (Either Error GroupId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GroupId -> Either Error GroupId
forall a b. b -> Either a b
Right (GroupId -> Either Error GroupId)
-> (AsapoStringHandle -> GroupId)
-> AsapoStringHandle
-> Either Error GroupId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AsapoStringHandle -> GroupId
GroupId)
    destroy :: Either a GroupId -> IO ()
destroy (Right (GroupId AsapoStringHandle
stringHandle)) = AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle
stringHandle
    destroy Either a GroupId
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    handle :: Either Error GroupId -> IO a
handle (Right GroupId
v) = GroupId -> IO a
onSuccess GroupId
v
    handle (Left Error
e) = Error -> IO a
onError Error
e

-- | Set the global consumer timeout
setTimeout :: Consumer -> NominalDiffTime -> IO ()
setTimeout :: Consumer -> NominalDiffTime -> IO ()
setTimeout (Consumer AsapoConsumerHandle
consumerHandle) NominalDiffTime
timeout = AsapoConsumerHandle -> Word64 -> IO ()
asapo_consumer_set_timeout AsapoConsumerHandle
consumerHandle (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout)

-- | Reset the last read marker for a specific group
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO (Either Error Int)
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO (Either Error Int)
resetLastReadMarker (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_reset_last_read_marker AsapoConsumerHandle
consumer AsapoStringHandle
groupId ConstCString
streamNameC)

-- | Set the last read marker for the stream
setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
setLastReadMarker :: Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
setLastReadMarker (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
value) =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_set_last_read_marker AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
value ConstCString
streamNameC)

-- | Acknowledge a specific message
acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
acknowledge :: Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
acknowledge (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
messageId) =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_acknowledge AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
messageId ConstCString
streamNameC)

-- | Negatively acknowledge a specific message
negativeAcknowledge ::
  Consumer ->
  GroupId ->
  StreamName ->
  MessageId ->
  -- | delay
  NominalDiffTime ->
  IO (Either Error Int)
negativeAcknowledge :: Consumer
-> GroupId
-> StreamName
-> MessageId
-> NominalDiffTime
-> IO (Either Error Int)
negativeAcknowledge (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
messageId) NominalDiffTime
delay =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_negative_acknowledge AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
messageId (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
delay) ConstCString
streamNameC)

-- | Get a list of all unacknowledged message IDs in a range
getUnacknowledgedMessages :: Consumer -> GroupId -> StreamName -> (MessageId, MessageId) -> IO (Either Error [MessageId])
getUnacknowledgedMessages :: Consumer
-> GroupId
-> StreamName
-> (MessageId, MessageId)
-> IO (Either Error [MessageId])
getUnacknowledgedMessages (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) (StreamName Text
streamName) (MessageId Word64
from, MessageId Word64
to) = IO (Either Error AsapoIdListHandle)
-> (Either Error AsapoIdListHandle -> IO ())
-> (Either Error AsapoIdListHandle
    -> IO (Either Error [MessageId]))
-> IO (Either Error [MessageId])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoIdListHandle)
init Either Error AsapoIdListHandle -> IO ()
destroy Either Error AsapoIdListHandle -> IO (Either Error [MessageId])
handle
  where
    init :: IO (Either Error AsapoIdListHandle)
    init :: IO (Either Error AsapoIdListHandle)
init = Text
-> (ConstCString -> IO (Either Error AsapoIdListHandle))
-> IO (Either Error AsapoIdListHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName ((ConstCString -> IO (Either Error AsapoIdListHandle))
 -> IO (Either Error AsapoIdListHandle))
-> (ConstCString -> IO (Either Error AsapoIdListHandle))
-> IO (Either Error AsapoIdListHandle)
forall a b. (a -> b) -> a -> b
$ (Ptr AsapoErrorHandle -> IO AsapoIdListHandle)
-> IO (Either Error AsapoIdListHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError ((Ptr AsapoErrorHandle -> IO AsapoIdListHandle)
 -> IO (Either Error AsapoIdListHandle))
-> (ConstCString -> Ptr AsapoErrorHandle -> IO AsapoIdListHandle)
-> ConstCString
-> IO (Either Error AsapoIdListHandle)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoIdListHandle
asapo_consumer_get_unacknowledged_messages AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
from Word64
to
    destroy :: Either Error AsapoIdListHandle -> IO ()
    destroy :: Either Error AsapoIdListHandle -> IO ()
destroy (Left Error
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    destroy (Right AsapoIdListHandle
handle') = AsapoIdListHandle -> IO ()
asapo_free_id_list_handle AsapoIdListHandle
handle'
    handle :: Either Error AsapoIdListHandle -> IO (Either Error [MessageId])
    handle :: Either Error AsapoIdListHandle -> IO (Either Error [MessageId])
handle (Left Error
e) = Either Error [MessageId] -> IO (Either Error [MessageId])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error [MessageId]
forall a b. a -> Either a b
Left Error
e)
    handle (Right AsapoIdListHandle
idListHandle) = do
      CSize
numberOfIds <- AsapoIdListHandle -> IO CSize
asapo_id_list_get_size AsapoIdListHandle
idListHandle
      [MessageId] -> Either Error [MessageId]
forall a b. b -> Either a b
Right ([MessageId] -> Either Error [MessageId])
-> IO [MessageId] -> IO (Either Error [MessageId])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO MessageId) -> CSize -> IO [MessageId]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit ((Word64 -> MessageId
MessageId (Word64 -> MessageId) -> IO Word64 -> IO MessageId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (IO Word64 -> IO MessageId)
-> (CSize -> IO Word64) -> CSize -> IO MessageId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AsapoIdListHandle -> CSize -> IO Word64
asapo_id_list_get_item AsapoIdListHandle
idListHandle) CSize
numberOfIds

-- | Network connection type
data NetworkConnectionType
  = -- | not sure about this
    ConnectionUndefined
  | -- | TCP
    ConnectionTcp
  | -- | not sure about this
    ConnectionFabric

convertConnectionType :: AsapoNetworkConnectionType -> NetworkConnectionType
convertConnectionType :: AsapoBool -> NetworkConnectionType
convertConnectionType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kUndefined = NetworkConnectionType
ConnectionUndefined
convertConnectionType AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kAsapoTcp = NetworkConnectionType
ConnectionTcp
convertConnectionType AsapoBool
_ = NetworkConnectionType
ConnectionFabric

-- | Retrieve the current consumer connection type
getCurrentConnectionType :: Consumer -> IO NetworkConnectionType
getCurrentConnectionType :: Consumer -> IO NetworkConnectionType
getCurrentConnectionType (Consumer AsapoConsumerHandle
consumerHandle) = AsapoBool -> NetworkConnectionType
convertConnectionType (AsapoBool -> NetworkConnectionType)
-> IO AsapoBool -> IO NetworkConnectionType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoConsumerHandle -> IO AsapoBool
asapo_consumer_current_connection_type AsapoConsumerHandle
consumerHandle

data StreamFilter = FilterAllStreams | FilterFinishedStreams | FilterUnfinishedStreams

convertStreamFilter :: StreamFilter -> AsapoStreamFilter
convertStreamFilter :: StreamFilter -> AsapoBool
convertStreamFilter StreamFilter
FilterAllStreams = AsapoBool
kAllStreams
convertStreamFilter StreamFilter
FilterFinishedStreams = AsapoBool
kFinishedStreams
convertStreamFilter StreamFilter
FilterUnfinishedStreams = AsapoBool
kUnfinishedStreams

-- An often-repeating pattern: getting a length value and then either
-- returning an empty list or having another function to get the value
-- per ID. With an annoying off-by-one error.
repeatGetterWithSizeLimit :: (Eq a1, Num a1, Applicative f, Enum a1) => (a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit :: forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit a1 -> f a2
f a1
n
  | a1
n a1 -> a1 -> Bool
forall a. Eq a => a -> a -> Bool
== a1
0 = [a2] -> f [a2]
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
  | Bool
otherwise = (a1 -> f a2) -> [a1] -> f [a2]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse a1 -> f a2
f [a1
0 .. a1
n a1 -> a1 -> a1
forall a. Num a => a -> a -> a
- a1
1]

-- | Retrieve the list of streams with metadata
getStreamList ::
  Consumer ->
  -- | Pass @Nothing@ to get all streams
  Maybe StreamName ->
  StreamFilter ->
  IO (Either Error [StreamInfo])
getStreamList :: Consumer
-> Maybe StreamName
-> StreamFilter
-> IO (Either Error [StreamInfo])
getStreamList (Consumer AsapoConsumerHandle
consumer) Maybe StreamName
streamName StreamFilter
filter = IO (Either Error AsapoStreamInfosHandle)
-> (Either Error AsapoStreamInfosHandle -> IO ())
-> (Either Error AsapoStreamInfosHandle
    -> IO (Either Error [StreamInfo]))
-> IO (Either Error [StreamInfo])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStreamInfosHandle)
init Either Error AsapoStreamInfosHandle -> IO ()
forall {a}. Either a AsapoStreamInfosHandle -> IO ()
destroy Either Error AsapoStreamInfosHandle
-> IO (Either Error [StreamInfo])
forall {a}.
Either a AsapoStreamInfosHandle -> IO (Either a [StreamInfo])
handle
  where
    init :: IO (Either Error AsapoStreamInfosHandle)
init =
      let realStreamName :: Text
realStreamName = case Maybe StreamName
streamName of
            Maybe StreamName
Nothing -> Text
""
            Just (StreamName Text
streamName') -> Text
streamName'
       in Text
-> (ConstCString -> IO (Either Error AsapoStreamInfosHandle))
-> IO (Either Error AsapoStreamInfosHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
realStreamName ((ConstCString -> IO (Either Error AsapoStreamInfosHandle))
 -> IO (Either Error AsapoStreamInfosHandle))
-> (ConstCString -> IO (Either Error AsapoStreamInfosHandle))
-> IO (Either Error AsapoStreamInfosHandle)
forall a b. (a -> b) -> a -> b
$ \ConstCString
streamNameC -> (Ptr AsapoErrorHandle -> IO AsapoStreamInfosHandle)
-> IO (Either Error AsapoStreamInfosHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> ConstCString
-> AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoStreamInfosHandle
asapo_consumer_get_stream_list AsapoConsumerHandle
consumer ConstCString
streamNameC (StreamFilter -> AsapoBool
convertStreamFilter StreamFilter
filter))
    destroy :: Either a AsapoStreamInfosHandle -> IO ()
destroy (Left a
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    destroy (Right AsapoStreamInfosHandle
handle') = AsapoStreamInfosHandle -> IO ()
asapo_free_stream_infos_handle AsapoStreamInfosHandle
handle'
    handle :: Either a AsapoStreamInfosHandle -> IO (Either a [StreamInfo])
handle (Left a
e) = Either a [StreamInfo] -> IO (Either a [StreamInfo])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Either a [StreamInfo]
forall a b. a -> Either a b
Left a
e)
    handle (Right AsapoStreamInfosHandle
streamInfosHandle) = do
      CSize
numberOfStreams <- AsapoStreamInfosHandle -> IO CSize
asapo_stream_infos_get_size AsapoStreamInfosHandle
streamInfosHandle
      [StreamInfo] -> Either a [StreamInfo]
forall a b. b -> Either a b
Right
        ([StreamInfo] -> Either a [StreamInfo])
-> IO [StreamInfo] -> IO (Either a [StreamInfo])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO StreamInfo) -> CSize -> IO [StreamInfo]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit
          (AsapoStreamInfosHandle -> CSize -> IO AsapoStreamInfoHandle
asapo_stream_infos_get_item AsapoStreamInfosHandle
streamInfosHandle (CSize -> IO AsapoStreamInfoHandle)
-> (AsapoStreamInfoHandle -> IO StreamInfo)
-> CSize
-> IO StreamInfo
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoStreamInfoHandle -> IO StreamInfo
retrieveStreamInfoFromC)
          CSize
numberOfStreams

-- | Anti-boolean-blindness for delete or not delete metadata when deleting a stream
data DeleteFlag = DeleteMeta | DontDeleteMeta deriving (DeleteFlag -> DeleteFlag -> Bool
(DeleteFlag -> DeleteFlag -> Bool)
-> (DeleteFlag -> DeleteFlag -> Bool) -> Eq DeleteFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeleteFlag -> DeleteFlag -> Bool
== :: DeleteFlag -> DeleteFlag -> Bool
$c/= :: DeleteFlag -> DeleteFlag -> Bool
/= :: DeleteFlag -> DeleteFlag -> Bool
Eq)

-- | Anti-boolean-blindness for "error on not existing data"
data ErrorOnNotExistFlag = ErrorOnNotExist | NoErrorOnNotExist deriving (ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
(ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool)
-> (ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool)
-> Eq ErrorOnNotExistFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
== :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
$c/= :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
/= :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
Eq)

-- | Delete a given stream
deleteStream :: Consumer -> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO (Either Error Int)
deleteStream :: Consumer
-> StreamName
-> DeleteFlag
-> ErrorOnNotExistFlag
-> IO (Either Error Int)
deleteStream (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) DeleteFlag
deleteFlag ErrorOnNotExistFlag
errorOnNotExistFlag =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
    (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
      (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
        ( AsapoConsumerHandle
-> ConstCString
-> AsapoBool
-> AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_delete_stream
            AsapoConsumerHandle
consumer
            ConstCString
streamNameC
            (if DeleteFlag
deleteFlag DeleteFlag -> DeleteFlag -> Bool
forall a. Eq a => a -> a -> Bool
== DeleteFlag
DeleteMeta then AsapoBool
1 else AsapoBool
0)
            (if ErrorOnNotExistFlag
errorOnNotExistFlag ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorOnNotExistFlag
ErrorOnNotExist then AsapoBool
1 else AsapoBool
0)
        )

-- | Set a stream persistent
setStreamPersistent :: Consumer -> StreamName -> IO (Either Error Int)
setStreamPersistent :: Consumer -> StreamName -> IO (Either Error Int)
setStreamPersistent (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
    (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
      (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
        ( AsapoConsumerHandle
-> ConstCString -> Ptr AsapoErrorHandle -> IO AsapoBool
asapo_consumer_set_stream_persistent
            AsapoConsumerHandle
consumer
            ConstCString
streamNameC
        )

-- | Get the current size (number of messages) of the stream
getCurrentSize :: Consumer -> StreamName -> IO (Either Error Int)
getCurrentSize :: Consumer -> StreamName -> IO (Either Error Int)
getCurrentSize (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
    (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Either Error Int64 -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
      (Either Error Int64 -> Either Error Int)
-> IO (Either Error Int64) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO Int64) -> IO (Either Error Int64)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
        ( AsapoConsumerHandle
-> ConstCString -> Ptr AsapoErrorHandle -> IO Int64
asapo_consumer_get_current_size
            AsapoConsumerHandle
consumer
            ConstCString
streamNameC
        )

-- | Anti-boolean-blindness for "include incomplete data sets in list"
data IncludeIncompleteFlag = IncludeIncomplete | ExcludeIncomplete deriving (IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
(IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool)
-> (IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool)
-> Eq IncludeIncompleteFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
== :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
$c/= :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
/= :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
Eq)

-- | Get number of datasets in stream
getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO (Either Error Int)
getCurrentDatasetCount :: Consumer
-> StreamName -> IncludeIncompleteFlag -> IO (Either Error Int)
getCurrentDatasetCount (Consumer AsapoConsumerHandle
consumer) (StreamName Text
streamName) IncludeIncompleteFlag
inludeIncomplete =
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
    (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Either Error Int64 -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
      (Either Error Int64 -> Either Error Int)
-> IO (Either Error Int64) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO Int64) -> IO (Either Error Int64)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
        ( AsapoConsumerHandle
-> ConstCString -> AsapoBool -> Ptr AsapoErrorHandle -> IO Int64
asapo_consumer_get_current_dataset_count
            AsapoConsumerHandle
consumer
            ConstCString
streamNameC
            (if IncludeIncompleteFlag
inludeIncomplete IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool
forall a. Eq a => a -> a -> Bool
== IncludeIncompleteFlag
IncludeIncomplete then AsapoBool
1 else AsapoBool
0)
        )

-- | Get beamtime metadata (which can be not set, in which case @Nothing@ is returned)
getBeamtimeMeta :: Consumer -> IO (Either Error (Maybe Text))
getBeamtimeMeta :: Consumer -> IO (Either Error (Maybe Text))
getBeamtimeMeta (Consumer AsapoConsumerHandle
consumer) = (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> IO (Either Error AsapoStringHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle -> Ptr AsapoErrorHandle -> IO AsapoStringHandle
asapo_consumer_get_beamtime_meta AsapoConsumerHandle
consumer) IO (Either Error AsapoStringHandle)
-> (Either Error AsapoStringHandle
    -> IO (Either Error (Maybe Text)))
-> IO (Either Error (Maybe Text))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoStringHandle -> IO (Maybe Text))
-> Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoStringHandle -> IO (Maybe Text)
stringHandleToText

-- | Metadata handle, can be passed around as a pure value and be used to retrieve actual data for the metadata as a two-step process, using the @retrieveDataForMessageMeta@ function(s)
newtype MessageMetaHandle = MessageMetaHandle (ForeignPtr ()) deriving (Int -> MessageMetaHandle -> ShowS
[MessageMetaHandle] -> ShowS
MessageMetaHandle -> String
(Int -> MessageMetaHandle -> ShowS)
-> (MessageMetaHandle -> String)
-> ([MessageMetaHandle] -> ShowS)
-> Show MessageMetaHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageMetaHandle -> ShowS
showsPrec :: Int -> MessageMetaHandle -> ShowS
$cshow :: MessageMetaHandle -> String
show :: MessageMetaHandle -> String
$cshowList :: [MessageMetaHandle] -> ShowS
showList :: [MessageMetaHandle] -> ShowS
Show)

withMessageMetaHandle :: MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle :: forall a.
MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle (MessageMetaHandle ForeignPtr ()
foreignPtr) AsapoMessageMetaHandle -> IO a
f = ForeignPtr () -> (Ptr () -> IO a) -> IO a
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr ()
foreignPtr (AsapoMessageMetaHandle -> IO a
f (AsapoMessageMetaHandle -> IO a)
-> (Ptr () -> AsapoMessageMetaHandle) -> Ptr () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ptr () -> AsapoMessageMetaHandle
AsapoMessageMetaHandle)

newMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle (AsapoMessageMetaHandle Ptr ()
inputPtr) = do
  ForeignPtr () -> MessageMetaHandle
MessageMetaHandle (ForeignPtr () -> MessageMetaHandle)
-> IO (ForeignPtr ()) -> IO MessageMetaHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FinalizerPtr () -> Ptr () -> IO (ForeignPtr ())
forall a. FinalizerPtr a -> Ptr a -> IO (ForeignPtr a)
newForeignPtr FinalizerPtr ()
p_asapo_free_handle Ptr ()
inputPtr

newtype MessageDataHandle = MessageDataHandle (ForeignPtr ()) deriving (Int -> MessageDataHandle -> ShowS
[MessageDataHandle] -> ShowS
MessageDataHandle -> String
(Int -> MessageDataHandle -> ShowS)
-> (MessageDataHandle -> String)
-> ([MessageDataHandle] -> ShowS)
-> Show MessageDataHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageDataHandle -> ShowS
showsPrec :: Int -> MessageDataHandle -> ShowS
$cshow :: MessageDataHandle -> String
show :: MessageDataHandle -> String
$cshowList :: [MessageDataHandle] -> ShowS
showList :: [MessageDataHandle] -> ShowS
Show)

withMessageDataHandle :: MessageDataHandle -> (AsapoMessageDataHandle -> IO a) -> IO a
withMessageDataHandle :: forall a.
MessageDataHandle -> (AsapoMessageDataHandle -> IO a) -> IO a
withMessageDataHandle (MessageDataHandle ForeignPtr ()
foreignPtr) AsapoMessageDataHandle -> IO a
f = ForeignPtr () -> (Ptr () -> IO a) -> IO a
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr ()
foreignPtr (AsapoMessageDataHandle -> IO a
f (AsapoMessageDataHandle -> IO a)
-> (Ptr () -> AsapoMessageDataHandle) -> Ptr () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ptr () -> AsapoMessageDataHandle
AsapoMessageDataHandle)

newMessageDataHandle :: AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle :: AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle (AsapoMessageDataHandle Ptr ()
inputPtr) =
  ForeignPtr () -> MessageDataHandle
MessageDataHandle (ForeignPtr () -> MessageDataHandle)
-> IO (ForeignPtr ()) -> IO MessageDataHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FinalizerPtr () -> Ptr () -> IO (ForeignPtr ())
forall a. FinalizerPtr a -> Ptr a -> IO (ForeignPtr a)
newForeignPtr FinalizerPtr ()
p_asapo_free_handle Ptr ()
inputPtr

wrapMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle :: AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle = AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle

-- | Metadata for a single message
data MessageMeta = MessageMeta
  { MessageMeta -> Text
messageMetaName :: Text,
    MessageMeta -> UTCTime
messageMetaTimestamp :: UTCTime,
    MessageMeta -> Word64
messageMetaSize :: Word64,
    MessageMeta -> MessageId
messageMetaId :: MessageId,
    MessageMeta -> Text
messageMetaSource :: Text,
    MessageMeta -> Text
messageMetaMetadata :: Text,
    MessageMeta -> Word64
messageMetaBufId :: Word64,
    MessageMeta -> Word64
messageMetaDatasetSubstream :: Word64,
    MessageMeta -> MessageMetaHandle
messageMetaHandle :: MessageMetaHandle
  }
  deriving (Int -> MessageMeta -> ShowS
[MessageMeta] -> ShowS
MessageMeta -> String
(Int -> MessageMeta -> ShowS)
-> (MessageMeta -> String)
-> ([MessageMeta] -> ShowS)
-> Show MessageMeta
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageMeta -> ShowS
showsPrec :: Int -> MessageMeta -> ShowS
$cshow :: MessageMeta -> String
show :: MessageMeta -> String
$cshowList :: [MessageMeta] -> ShowS
showList :: [MessageMeta] -> ShowS
Show)

-- | Retrieve actual data for the handle
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO (Either Error BS.ByteString)
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO (Either Error ByteString)
retrieveDataForMessageMeta Consumer
consumer MessageMeta
meta = Consumer -> MessageMetaHandle -> IO (Either Error ByteString)
retrieveDataFromMeta Consumer
consumer (MessageMeta -> MessageMetaHandle
messageMetaHandle MessageMeta
meta)

-- | Get the actual metadata hiding behind a handle (shouldn't be necessary when using the convenience interfaces)
resolveMetadata :: MessageMetaHandle -> IO MessageMeta
resolveMetadata :: MessageMetaHandle -> IO MessageMeta
resolveMetadata MessageMetaHandle
metaHandle = MessageMetaHandle
-> (AsapoMessageMetaHandle -> IO MessageMeta) -> IO MessageMeta
forall a.
MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle MessageMetaHandle
metaHandle \AsapoMessageMetaHandle
meta -> do
  (TimeSpec
timestamp, ()
_) <- TimeSpec -> (Ptr TimeSpec -> IO ()) -> IO (TimeSpec, ())
forall a b. Storable a => a -> (Ptr a -> IO b) -> IO (a, b)
withPtr TimeSpec
0 (AsapoMessageMetaHandle -> Ptr TimeSpec -> IO ()
asapo_message_meta_get_timestamp AsapoMessageMetaHandle
meta)
  Text
-> UTCTime
-> Word64
-> MessageId
-> Text
-> Text
-> Word64
-> Word64
-> MessageMetaHandle
-> MessageMeta
MessageMeta
    (Text
 -> UTCTime
 -> Word64
 -> MessageId
 -> Text
 -> Text
 -> Word64
 -> Word64
 -> MessageMetaHandle
 -> MessageMeta)
-> IO Text
-> IO
     (UTCTime
      -> Word64
      -> MessageId
      -> Text
      -> Text
      -> Word64
      -> Word64
      -> MessageMetaHandle
      -> MessageMeta)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AsapoMessageMetaHandle -> IO ConstCString
asapo_message_meta_get_name AsapoMessageMetaHandle
meta IO ConstCString -> (ConstCString -> IO Text) -> IO Text
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConstCString -> IO Text
peekConstCStringText)
    IO
  (UTCTime
   -> Word64
   -> MessageId
   -> Text
   -> Text
   -> Word64
   -> Word64
   -> MessageMetaHandle
   -> MessageMeta)
-> IO UTCTime
-> IO
     (Word64
      -> MessageId
      -> Text
      -> Text
      -> Word64
      -> Word64
      -> MessageMetaHandle
      -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> UTCTime -> IO UTCTime
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TimeSpec -> UTCTime
timespecToUTC TimeSpec
timestamp)
    IO
  (Word64
   -> MessageId
   -> Text
   -> Text
   -> Word64
   -> Word64
   -> MessageMetaHandle
   -> MessageMeta)
-> IO Word64
-> IO
     (MessageId
      -> Text
      -> Text
      -> Word64
      -> Word64
      -> MessageMetaHandle
      -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_size AsapoMessageMetaHandle
meta
    IO
  (MessageId
   -> Text
   -> Text
   -> Word64
   -> Word64
   -> MessageMetaHandle
   -> MessageMeta)
-> IO MessageId
-> IO
     (Text
      -> Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Word64 -> MessageId
MessageId (Word64 -> MessageId) -> IO Word64 -> IO MessageId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_id AsapoMessageMetaHandle
meta)
    IO
  (Text
   -> Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Text
-> IO
     (Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (AsapoMessageMetaHandle -> IO ConstCString
asapo_message_meta_get_source AsapoMessageMetaHandle
meta IO ConstCString -> (ConstCString -> IO Text) -> IO Text
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConstCString -> IO Text
peekConstCStringText)
    IO (Text -> Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Text
-> IO (Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (AsapoMessageMetaHandle -> IO ConstCString
asapo_message_meta_get_metadata AsapoMessageMetaHandle
meta IO ConstCString -> (ConstCString -> IO Text) -> IO Text
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConstCString -> IO Text
peekConstCStringText)
    IO (Word64 -> Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Word64 -> IO (Word64 -> MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_buf_id AsapoMessageMetaHandle
meta
    IO (Word64 -> MessageMetaHandle -> MessageMeta)
-> IO Word64 -> IO (MessageMetaHandle -> MessageMeta)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoMessageMetaHandle -> IO Word64
asapo_message_meta_get_dataset_substream AsapoMessageMetaHandle
meta
    IO (MessageMetaHandle -> MessageMeta)
-> IO MessageMetaHandle -> IO MessageMeta
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MessageMetaHandle -> IO MessageMetaHandle
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageMetaHandle
metaHandle

-- | Metadata for a dataset
data Dataset = Dataset
  { Dataset -> Word64
datasetId :: Word64,
    Dataset -> Word64
datasetExpectedSize :: Word64,
    Dataset -> [MessageMetaHandle]
datasetItems :: [MessageMetaHandle]
  }

retrieveDatasetFromC :: AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC :: AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC AsapoDataSetHandle
handle = do
  CSize
numberOfItems <- AsapoDataSetHandle -> IO CSize
asapo_dataset_get_size AsapoDataSetHandle
handle
  [MessageMetaHandle]
items <- (CSize -> IO MessageMetaHandle) -> CSize -> IO [MessageMetaHandle]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit (AsapoDataSetHandle -> CSize -> IO AsapoMessageMetaHandle
asapo_dataset_get_item AsapoDataSetHandle
handle (CSize -> IO AsapoMessageMetaHandle)
-> (AsapoMessageMetaHandle -> IO MessageMetaHandle)
-> CSize
-> IO MessageMetaHandle
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle) CSize
numberOfItems
  Word64 -> Word64 -> [MessageMetaHandle] -> Dataset
Dataset (Word64 -> Word64 -> [MessageMetaHandle] -> Dataset)
-> IO Word64 -> IO (Word64 -> [MessageMetaHandle] -> Dataset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoDataSetHandle -> IO Word64
asapo_dataset_get_id AsapoDataSetHandle
handle IO (Word64 -> [MessageMetaHandle] -> Dataset)
-> IO Word64 -> IO ([MessageMetaHandle] -> Dataset)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> AsapoDataSetHandle -> IO Word64
asapo_dataset_get_expected_size AsapoDataSetHandle
handle IO ([MessageMetaHandle] -> Dataset)
-> IO [MessageMetaHandle] -> IO Dataset
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [MessageMetaHandle] -> IO [MessageMetaHandle]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [MessageMetaHandle]
items

-- | Get the next dataset for a stream
getNextDataset ::
  Consumer ->
  GroupId ->
  -- | Wait until dataset has these number of messages (0 for maximum size)
  Word64 ->
  StreamName ->
  IO (Either Error Dataset)
getNextDataset :: Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
getNextDataset (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) Word64
minSize (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> do
  (Ptr AsapoErrorHandle -> IO AsapoDataSetHandle)
-> IO (Either Error AsapoDataSetHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoDataSetHandle
asapo_consumer_get_next_dataset AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
minSize ConstCString
streamNameC) IO (Either Error AsapoDataSetHandle)
-> (Either Error AsapoDataSetHandle -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoDataSetHandle -> IO Dataset)
-> Either Error AsapoDataSetHandle -> IO (Either Error Dataset)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC

-- | Get the last dataset in the stream
getLastDataset ::
  Consumer ->
  -- | Wait until dataset has these number of messages (0 for maximum size)
  Word64 ->
  StreamName ->
  IO (Either Error Dataset)
getLastDataset :: Consumer -> Word64 -> StreamName -> IO (Either Error Dataset)
getLastDataset (Consumer AsapoConsumerHandle
consumer) Word64
minSize (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> do
  (Ptr AsapoErrorHandle -> IO AsapoDataSetHandle)
-> IO (Either Error AsapoDataSetHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoDataSetHandle
asapo_consumer_get_last_dataset AsapoConsumerHandle
consumer Word64
minSize ConstCString
streamNameC) IO (Either Error AsapoDataSetHandle)
-> (Either Error AsapoDataSetHandle -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoDataSetHandle -> IO Dataset)
-> Either Error AsapoDataSetHandle -> IO (Either Error Dataset)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC

-- | Get the last data ste in the given group
getLastDatasetInGroup ::
  Consumer ->
  GroupId ->
  -- | Wait until dataset has these number of messages (0 for maximum size)
  Word64 ->
  StreamName ->
  IO (Either Error Dataset)
getLastDatasetInGroup :: Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
getLastDatasetInGroup (Consumer AsapoConsumerHandle
consumer) (GroupId AsapoStringHandle
groupId) Word64
minSize (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> do
  (Ptr AsapoErrorHandle -> IO AsapoDataSetHandle)
-> IO (Either Error AsapoDataSetHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> AsapoStringHandle
-> Word64
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoDataSetHandle
asapo_consumer_get_last_dataset_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId Word64
minSize ConstCString
streamNameC) IO (Either Error AsapoDataSetHandle)
-> (Either Error AsapoDataSetHandle -> IO (Either Error Dataset))
-> IO (Either Error Dataset)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (AsapoDataSetHandle -> IO Dataset)
-> Either Error AsapoDataSetHandle -> IO (Either Error Dataset)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Either Error a -> f (Either Error b)
traverse AsapoDataSetHandle -> IO Dataset
retrieveDatasetFromC

retrieveDataFromHandle :: MessageDataHandle -> IO BS.ByteString
retrieveDataFromHandle :: MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle = do
  MessageDataHandle
-> (AsapoMessageDataHandle -> IO ByteString) -> IO ByteString
forall a.
MessageDataHandle -> (AsapoMessageDataHandle -> IO a) -> IO a
withMessageDataHandle MessageDataHandle
dataHandle \AsapoMessageDataHandle
dataHandlePtr -> do
    ConstCString
messageCString <- AsapoMessageDataHandle -> IO ConstCString
asapo_message_data_get_as_chars AsapoMessageDataHandle
dataHandlePtr
    CString -> IO ByteString
BS.packCString (ConstCString -> CString
forall a. ConstPtr a -> Ptr a
unConstPtr ConstCString
messageCString)

-- | Retrieve data for the given metadata handle
retrieveDataFromMeta :: Consumer -> MessageMetaHandle -> IO (Either Error BS.ByteString)
retrieveDataFromMeta :: Consumer -> MessageMetaHandle -> IO (Either Error ByteString)
retrieveDataFromMeta (Consumer AsapoConsumerHandle
consumer) MessageMetaHandle
metaHandle =
  MessageMetaHandle
-> (AsapoMessageMetaHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
MessageMetaHandle -> (AsapoMessageMetaHandle -> IO a) -> IO a
withMessageMetaHandle MessageMetaHandle
metaHandle \AsapoMessageMetaHandle
metaHandlePtr ->
    (Ptr AsapoMessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoMessageDataHandle
dataHandlePtrPtr ->
      (Ptr AsapoErrorHandle -> IO AsapoBool)
-> (AsapoBool -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess (AsapoConsumerHandle
-> AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_retrieve_data AsapoConsumerHandle
consumer AsapoMessageMetaHandle
metaHandlePtr Ptr AsapoMessageDataHandle
dataHandlePtrPtr) \AsapoBool
_result -> do
        AsapoMessageDataHandle
dataHandlePtr <- Ptr AsapoMessageDataHandle -> IO AsapoMessageDataHandle
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoMessageDataHandle
dataHandlePtrPtr
        MessageDataHandle
dataHandle <- AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle AsapoMessageDataHandle
dataHandlePtr
        ByteString -> Either Error ByteString
forall a b. b -> Either a b
Right (ByteString -> Either Error ByteString)
-> IO ByteString -> IO (Either Error ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle

withMessageHandles ::
  StreamName ->
  (Ptr AsapoMessageMetaHandle -> Ptr AsapoMessageDataHandle -> ConstCString -> Ptr AsapoErrorHandle -> IO CInt) ->
  (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)) ->
  IO (Either Error a)
withMessageHandles :: forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles (StreamName Text
streamName) Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
g MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)
f = do
  (Ptr AsapoMessageMetaHandle -> IO (Either Error a))
-> IO (Either Error a)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoMessageMetaHandle
metaHandlePtrPtr ->
    (Ptr AsapoMessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoMessageDataHandle
dataHandlePtrPtr ->
      Text
-> (ConstCString -> IO (Either Error a)) -> IO (Either Error a)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC ->
        (Ptr AsapoErrorHandle -> IO AsapoBool)
-> (AsapoBool -> IO (Either Error a)) -> IO (Either Error a)
forall t b.
(Ptr AsapoErrorHandle -> IO t)
-> (t -> IO (Either Error b)) -> IO (Either Error b)
withSuccess (Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
g Ptr AsapoMessageMetaHandle
metaHandlePtrPtr Ptr AsapoMessageDataHandle
dataHandlePtrPtr ConstCString
streamNameC) \AsapoBool
_result -> do
          AsapoMessageMetaHandle
metaHandlePtr <- Ptr AsapoMessageMetaHandle -> IO AsapoMessageMetaHandle
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoMessageMetaHandle
metaHandlePtrPtr
          AsapoMessageDataHandle
dataHandlePtr <- Ptr AsapoMessageDataHandle -> IO AsapoMessageDataHandle
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoMessageDataHandle
dataHandlePtrPtr
          MessageMetaHandle
metaHandle <- AsapoMessageMetaHandle -> IO MessageMetaHandle
newMessageMetaHandle AsapoMessageMetaHandle
metaHandlePtr
          MessageDataHandle
dataHandle <- AsapoMessageDataHandle -> IO MessageDataHandle
newMessageDataHandle AsapoMessageDataHandle
dataHandlePtr
          MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)
f MessageMetaHandle
metaHandle MessageDataHandle
dataHandle

withMessageHandlesById :: Consumer -> StreamName -> MessageId -> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a)) -> IO (Either Error a)
withMessageHandlesById :: forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (MessageId Word64
messageId) =
  StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles
    StreamName
streamName
    (AsapoConsumerHandle
-> Word64
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_by_id AsapoConsumerHandle
consumer Word64
messageId)

retrieveMessageMetaAndData :: MessageMetaHandle -> MessageDataHandle -> IO (Either a (MessageMeta, BS.ByteString))
retrieveMessageMetaAndData :: forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData MessageMetaHandle
metaHandle MessageDataHandle
dataHandle = do
  ByteString
data' <- MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle
  MessageMeta
meta <- MessageMetaHandle -> IO MessageMeta
resolveMetadata MessageMetaHandle
metaHandle
  Either a (MessageMeta, ByteString)
-> IO (Either a (MessageMeta, ByteString))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((MessageMeta, ByteString) -> Either a (MessageMeta, ByteString)
forall a b. b -> Either a b
Right (MessageMeta
meta, ByteString
data'))

retrieveMessageMeta :: MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta :: forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta MessageMetaHandle
metaHandle p
_dataHandle = do
  MessageMeta
meta <- MessageMetaHandle -> IO MessageMeta
resolveMetadata MessageMetaHandle
metaHandle
  Either a MessageMeta -> IO (Either a MessageMeta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageMeta -> Either a MessageMeta
forall a b. b -> Either a b
Right MessageMeta
meta)

retrieveMessageData :: p -> MessageDataHandle -> IO (Either a BS.ByteString)
retrieveMessageData :: forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData p
_metaHandle MessageDataHandle
dataHandle = do
  ByteString
data' <- MessageDataHandle -> IO ByteString
retrieveDataFromHandle MessageDataHandle
dataHandle
  Either a ByteString -> IO (Either a ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Either a ByteString
forall a b. b -> Either a b
Right ByteString
data')

-- | Given a message ID, retrieve both metadata and data
getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (Either Error (MessageMeta, BS.ByteString))
getMessageMetaAndDataById :: Consumer
-> StreamName
-> MessageId
-> IO (Either Error (MessageMeta, ByteString))
getMessageMetaAndDataById Consumer
consumer StreamName
streamName MessageId
messageId =
  Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle
    -> MessageDataHandle
    -> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById Consumer
consumer StreamName
streamName MessageId
messageId MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData

-- | Given a message ID, retrieve only the metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO (Either Error MessageMeta)
getMessageMetaById :: Consumer
-> StreamName -> MessageId -> IO (Either Error MessageMeta)
getMessageMetaById Consumer
consumer StreamName
streamName MessageId
messageId = do
  Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById Consumer
consumer StreamName
streamName MessageId
messageId MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta

-- | Given a message ID, retrieve only the data
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO (Either Error BS.ByteString)
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO (Either Error ByteString)
getMessageDataById Consumer
consumer StreamName
streamName MessageId
messageId = do
  Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
Consumer
-> StreamName
-> MessageId
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandlesById Consumer
consumer StreamName
streamName MessageId
messageId MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData

-- | Retrieve the last message in the stream, with data and metadata
getLastMessageMetaAndData :: Consumer -> StreamName -> IO (Either Error (MessageMeta, BS.ByteString))
getLastMessageMetaAndData :: Consumer
-> StreamName -> IO (Either Error (MessageMeta, ByteString))
getLastMessageMetaAndData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle
    -> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last AsapoConsumerHandle
consumer) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData

-- | Retrieve the last message in the stream, only metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getLastMessageMeta :: Consumer -> StreamName -> IO (Either Error MessageMeta)
getLastMessageMeta :: Consumer -> StreamName -> IO (Either Error MessageMeta)
getLastMessageMeta (Consumer AsapoConsumerHandle
consumer) StreamName
streamName = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last AsapoConsumerHandle
consumer) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta

-- | Retrieve the last message in the stream, only data
getLastMessageData :: Consumer -> StreamName -> IO (Either Error BS.ByteString)
getLastMessageData :: Consumer -> StreamName -> IO (Either Error ByteString)
getLastMessageData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last AsapoConsumerHandle
consumer) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData

-- | Retrieve the last message in a given stream and group, with data and metadata
getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (Either Error (MessageMeta, BS.ByteString))
getLastInGroupMessageMetaAndData :: Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
getLastInGroupMessageMetaAndData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle
    -> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData

-- | Retrieve the last message in a given stream and group, only metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getLastInGroupMessageMeta (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta

-- | Retrieve the last message in a given stream and group, only data
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error BS.ByteString)
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
getLastInGroupMessageData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_last_ingroup AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData

-- | Retrieve the next message in the stream and group, with data and metadata
getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (Either Error (MessageMeta, BS.ByteString))
getNextMessageMetaAndData :: Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
getNextMessageMetaAndData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) =
  StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle
    -> IO (Either Error (MessageMeta, ByteString)))
-> IO (Either Error (MessageMeta, ByteString))
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles
    StreamName
streamName
    (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_next AsapoConsumerHandle
consumer AsapoStringHandle
groupId)
    MessageMetaHandle
-> MessageDataHandle -> IO (Either Error (MessageMeta, ByteString))
forall a.
MessageMetaHandle
-> MessageDataHandle -> IO (Either a (MessageMeta, ByteString))
retrieveMessageMetaAndData

-- | Retrieve the next message in the stream and group, only metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
getNextMessageMeta (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error MessageMeta))
-> IO (Either Error MessageMeta)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_next AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error MessageMeta)
forall p a. MessageMetaHandle -> p -> IO (Either a MessageMeta)
retrieveMessageMeta

-- | Retrieve the next message in the stream and group, only data
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error BS.ByteString)
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
getNextMessageData (Consumer AsapoConsumerHandle
consumer) StreamName
streamName (GroupId AsapoStringHandle
groupId) = StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle
    -> MessageDataHandle -> IO (Either Error ByteString))
-> IO (Either Error ByteString)
forall a.
StreamName
-> (Ptr AsapoMessageMetaHandle
    -> Ptr AsapoMessageDataHandle
    -> ConstCString
    -> Ptr AsapoErrorHandle
    -> IO AsapoBool)
-> (MessageMetaHandle -> MessageDataHandle -> IO (Either Error a))
-> IO (Either Error a)
withMessageHandles StreamName
streamName (AsapoConsumerHandle
-> AsapoStringHandle
-> Ptr AsapoMessageMetaHandle
-> Ptr AsapoMessageDataHandle
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_consumer_get_next AsapoConsumerHandle
consumer AsapoStringHandle
groupId) MessageMetaHandle
-> MessageDataHandle -> IO (Either Error ByteString)
forall p a. p -> MessageDataHandle -> IO (Either a ByteString)
retrieveMessageData

-- | Query messages, return handles without data
queryMessagesHandles ::
  Consumer ->
  -- | Actual query string, see the docs for syntax
  Text ->
  StreamName ->
  IO (Either Error [MessageMetaHandle])
queryMessagesHandles :: Consumer
-> Text -> StreamName -> IO (Either Error [MessageMetaHandle])
queryMessagesHandles (Consumer AsapoConsumerHandle
consumer) Text
query (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error [MessageMetaHandle]))
-> IO (Either Error [MessageMetaHandle])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> Text
-> (ConstCString -> IO (Either Error [MessageMetaHandle]))
-> IO (Either Error [MessageMetaHandle])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
query \ConstCString
queryC ->
  let init :: IO (Either Error AsapoMessageMetasHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoMessageMetasHandle)
-> IO (Either Error AsapoMessageMetasHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> ConstCString
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoMessageMetasHandle
asapo_consumer_query_messages AsapoConsumerHandle
consumer ConstCString
queryC ConstCString
streamNameC)
      destroy :: Either a AsapoMessageMetasHandle -> IO ()
destroy (Left a
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      destroy (Right AsapoMessageMetasHandle
v) = AsapoMessageMetasHandle -> IO ()
asapo_free_message_metas_handle AsapoMessageMetasHandle
v
   in IO (Either Error AsapoMessageMetasHandle)
-> (Either Error AsapoMessageMetasHandle -> IO ())
-> (Either Error AsapoMessageMetasHandle
    -> IO (Either Error [MessageMetaHandle]))
-> IO (Either Error [MessageMetaHandle])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoMessageMetasHandle)
init Either Error AsapoMessageMetasHandle -> IO ()
forall {a}. Either a AsapoMessageMetasHandle -> IO ()
destroy \case
        Left Error
e -> Either Error [MessageMetaHandle]
-> IO (Either Error [MessageMetaHandle])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error [MessageMetaHandle]
forall a b. a -> Either a b
Left Error
e)
        Right AsapoMessageMetasHandle
metasHandle' -> do
          CSize
numberOfMetas <- AsapoMessageMetasHandle -> IO CSize
asapo_message_metas_get_size AsapoMessageMetasHandle
metasHandle'
          [MessageMetaHandle] -> Either Error [MessageMetaHandle]
forall a b. b -> Either a b
Right ([MessageMetaHandle] -> Either Error [MessageMetaHandle])
-> IO [MessageMetaHandle] -> IO (Either Error [MessageMetaHandle])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO MessageMetaHandle) -> CSize -> IO [MessageMetaHandle]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit (AsapoMessageMetasHandle -> CSize -> IO AsapoMessageMetaHandle
asapo_message_metas_get_item AsapoMessageMetasHandle
metasHandle' (CSize -> IO AsapoMessageMetaHandle)
-> (AsapoMessageMetaHandle -> IO MessageMetaHandle)
-> CSize
-> IO MessageMetaHandle
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle) CSize
numberOfMetas

-- | Query messages, return handles without data
queryMessages ::
  Consumer ->
  -- | Actual query string, see the docs for syntax
  Text ->
  StreamName ->
  IO (Either Error [MessageMeta])
queryMessages :: Consumer -> Text -> StreamName -> IO (Either Error [MessageMeta])
queryMessages (Consumer AsapoConsumerHandle
consumer) Text
query (StreamName Text
streamName) = Text
-> (ConstCString -> IO (Either Error [MessageMeta]))
-> IO (Either Error [MessageMeta])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
streamName \ConstCString
streamNameC -> Text
-> (ConstCString -> IO (Either Error [MessageMeta]))
-> IO (Either Error [MessageMeta])
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
query \ConstCString
queryC ->
  let init :: IO (Either Error AsapoMessageMetasHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoMessageMetasHandle)
-> IO (Either Error AsapoMessageMetasHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoConsumerHandle
-> ConstCString
-> ConstCString
-> Ptr AsapoErrorHandle
-> IO AsapoMessageMetasHandle
asapo_consumer_query_messages AsapoConsumerHandle
consumer ConstCString
queryC ConstCString
streamNameC)
      destroy :: Either a AsapoMessageMetasHandle -> IO ()
destroy (Left a
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      destroy (Right AsapoMessageMetasHandle
v) = AsapoMessageMetasHandle -> IO ()
asapo_free_message_metas_handle AsapoMessageMetasHandle
v
   in IO (Either Error AsapoMessageMetasHandle)
-> (Either Error AsapoMessageMetasHandle -> IO ())
-> (Either Error AsapoMessageMetasHandle
    -> IO (Either Error [MessageMeta]))
-> IO (Either Error [MessageMeta])
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoMessageMetasHandle)
init Either Error AsapoMessageMetasHandle -> IO ()
forall {a}. Either a AsapoMessageMetasHandle -> IO ()
destroy \case
        Left Error
e -> Either Error [MessageMeta] -> IO (Either Error [MessageMeta])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error [MessageMeta]
forall a b. a -> Either a b
Left Error
e)
        Right AsapoMessageMetasHandle
metasHandle' -> do
          CSize
numberOfMetas <- AsapoMessageMetasHandle -> IO CSize
asapo_message_metas_get_size AsapoMessageMetasHandle
metasHandle'
          [MessageMeta] -> Either Error [MessageMeta]
forall a b. b -> Either a b
Right ([MessageMeta] -> Either Error [MessageMeta])
-> IO [MessageMeta] -> IO (Either Error [MessageMeta])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (CSize -> IO MessageMeta) -> CSize -> IO [MessageMeta]
forall a1 (f :: * -> *) a2.
(Eq a1, Num a1, Applicative f, Enum a1) =>
(a1 -> f a2) -> a1 -> f [a2]
repeatGetterWithSizeLimit (AsapoMessageMetasHandle -> CSize -> IO AsapoMessageMetaHandle
asapo_message_metas_get_item AsapoMessageMetasHandle
metasHandle' (CSize -> IO AsapoMessageMetaHandle)
-> (AsapoMessageMetaHandle -> IO MessageMeta)
-> CSize
-> IO MessageMeta
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> AsapoMessageMetaHandle -> IO MessageMetaHandle
wrapMessageMetaHandle (AsapoMessageMetaHandle -> IO MessageMetaHandle)
-> (MessageMetaHandle -> IO MessageMeta)
-> AsapoMessageMetaHandle
-> IO MessageMeta
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> MessageMetaHandle -> IO MessageMeta
resolveMetadata) CSize
numberOfMetas

-- | Reset negative acknowledgements
resendNacs ::
  Consumer ->
  -- | resend yes/no
  Bool ->
  -- | delay
  NominalDiffTime ->
  -- | attempts
  Word64 ->
  IO ()
resendNacs :: Consumer -> Bool -> NominalDiffTime -> Word64 -> IO ()
resendNacs (Consumer AsapoConsumerHandle
consumer) Bool
resend NominalDiffTime
delay = AsapoConsumerHandle -> AsapoBool -> Word64 -> Word64 -> IO ()
asapo_consumer_set_resend_nacs AsapoConsumerHandle
consumer (if Bool
resend then AsapoBool
1 else AsapoBool
0) (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
delay)