milena-0.5.2.2: A Kafka client for Haskell.

Safe HaskellNone
LanguageHaskell2010

Network.Kafka

Contents

Synopsis

Documentation

data KafkaState Source #

Constructors

KafkaState 

Fields

Instances

Show KafkaState Source # 
Generic KafkaState Source # 

Associated Types

type Rep KafkaState :: * -> * #

type Rep KafkaState Source # 
type Rep KafkaState = D1 * (MetaData "KafkaState" "Network.Kafka" "milena-0.5.2.2-CM3OIeYiw3H7AjX82WBPv6" False) (C1 * (MetaCons "KafkaState" PrefixI True) ((:*:) * ((:*:) * ((:*:) * (S1 * (MetaSel (Just Symbol "_stateName") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * KafkaString)) (S1 * (MetaSel (Just Symbol "_stateRequiredAcks") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * RequiredAcks))) ((:*:) * (S1 * (MetaSel (Just Symbol "_stateRequestTimeout") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * Timeout)) ((:*:) * (S1 * (MetaSel (Just Symbol "_stateWaitSize") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * MinBytes)) (S1 * (MetaSel (Just Symbol "_stateBufferSize") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * MaxBytes))))) ((:*:) * ((:*:) * (S1 * (MetaSel (Just Symbol "_stateWaitTime") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * MaxWaitTime)) ((:*:) * (S1 * (MetaSel (Just Symbol "_stateCorrelationId") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * CorrelationId)) (S1 * (MetaSel (Just Symbol "_stateBrokers") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (Map Leader Broker))))) ((:*:) * (S1 * (MetaSel (Just Symbol "_stateConnections") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (Map KafkaAddress (Pool Handle)))) ((:*:) * (S1 * (MetaSel (Just Symbol "_stateTopicMetadata") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (Map TopicName TopicMetadata))) (S1 * (MetaSel (Just Symbol "_stateAddresses") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * (NonEmpty KafkaAddress))))))))

data KafkaClientError Source #

Errors given from the Kafka monad.

Constructors

KafkaNoOffset

A response did not contain an offset.

KafkaDeserializationError String

A value could not be deserialized correctly.

KafkaInvalidBroker Leader

Could not find a cached broker for the found leader.

KafkaFailedToFetchMetadata 
KafkaIOException IOException 

Instances

Eq KafkaClientError Source # 
Show KafkaClientError Source # 
Generic KafkaClientError Source # 
Exception KafkaClientError Source # 
type Rep KafkaClientError Source # 
type Rep KafkaClientError = D1 * (MetaData "KafkaClientError" "Network.Kafka" "milena-0.5.2.2-CM3OIeYiw3H7AjX82WBPv6" False) ((:+:) * ((:+:) * (C1 * (MetaCons "KafkaNoOffset" PrefixI False) (U1 *)) (C1 * (MetaCons "KafkaDeserializationError" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * String)))) ((:+:) * (C1 * (MetaCons "KafkaInvalidBroker" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * Leader))) ((:+:) * (C1 * (MetaCons "KafkaFailedToFetchMetadata" PrefixI False) (U1 *)) (C1 * (MetaCons "KafkaIOException" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * IOException))))))

data KafkaTime Source #

An abstract form of Kafka's time. Used for querying offsets.

Constructors

LatestTime

The latest time on the broker.

EarliestTime

The earliest time on the broker.

OtherTime Time

A specific time.

Instances

Eq KafkaTime Source # 
Generic KafkaTime Source # 

Associated Types

type Rep KafkaTime :: * -> * #

type Rep KafkaTime Source # 
type Rep KafkaTime = D1 * (MetaData "KafkaTime" "Network.Kafka" "milena-0.5.2.2-CM3OIeYiw3H7AjX82WBPv6" False) ((:+:) * (C1 * (MetaCons "LatestTime" PrefixI False) (U1 *)) ((:+:) * (C1 * (MetaCons "EarliestTime" PrefixI False) (U1 *)) (C1 * (MetaCons "OtherTime" PrefixI False) (S1 * (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * Time)))))

data PartitionAndLeader Source #

Instances

Eq PartitionAndLeader Source # 
Ord PartitionAndLeader Source # 
Show PartitionAndLeader Source # 
Generic PartitionAndLeader Source # 
type Rep PartitionAndLeader Source # 
type Rep PartitionAndLeader = D1 * (MetaData "PartitionAndLeader" "Network.Kafka" "milena-0.5.2.2-CM3OIeYiw3H7AjX82WBPv6" False) (C1 * (MetaCons "PartitionAndLeader" PrefixI True) ((:*:) * (S1 * (MetaSel (Just Symbol "_palTopic") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * TopicName)) ((:*:) * (S1 * (MetaSel (Just Symbol "_palPartition") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * Partition)) (S1 * (MetaSel (Just Symbol "_palLeader") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 * Leader)))))

data TopicAndPartition Source #

tamPayload :: TopicAndMessage -> ByteString Source #

Get the bytes from the Kafka message, ignoring the topic.

Configuration

defaultMaxBytes :: MaxBytes Source #

Default: 1024 * 1024

mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState Source #

Create a consumer using default values.

tryKafka :: Kafka m => m a -> m a Source #

Catch IOExceptions and wrap them in KafkaIOExceptions.

makeRequest :: Kafka m => Handle -> ReqResp (m a) -> m a Source #

Make a request, incrementing the _stateCorrelationId.

metadata :: Kafka m => MetadataRequest -> m MetadataResponse Source #

Send a metadata request to any broker.

metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse Source #

Send a metadata request.

expect :: Kafka m => KafkaClientError -> (a -> Maybe b) -> a -> m b Source #

brokerPartitionInfo :: Kafka m => TopicName -> m (Set PartitionAndLeader) Source #

Find a leader and partition for the topic.

protocolTime :: KafkaTime -> Time Source #

Convert an abstract time to a serializable protocol value.

withBrokerHandle :: Kafka m => Broker -> (Handle -> m a) -> m a Source #

Execute a Kafka action with a Handle for the given Broker, updating the connections cache if needed.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a Source #

Execute a Kafka action with a Handle for the given KafkaAddress, updating the connections cache if needed.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

withAnyHandle :: Kafka m => (Handle -> m a) -> m a Source #

Like withAddressHandle, but round-robins the addresses in the KafkaState.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

Offsets

data PartitionOffsetRequestInfo Source #

Fields to construct an offset request, per topic and partition.

Constructors

PartitionOffsetRequestInfo 

Fields

getLastOffset :: Kafka m => KafkaTime -> Partition -> TopicName -> m Offset Source #

Get the first found offset.

getLastOffset' :: Kafka m => Handle -> KafkaTime -> Partition -> TopicName -> m Offset Source #

Get the first found offset.