Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- type KafkaAddress = (Host, Port)
- data KafkaState = KafkaState {
- _stateName :: KafkaString
- _stateRequiredAcks :: RequiredAcks
- _stateRequestTimeout :: Timeout
- _stateWaitSize :: MinBytes
- _stateBufferSize :: MaxBytes
- _stateWaitTime :: MaxWaitTime
- _stateCorrelationId :: CorrelationId
- _stateBrokers :: Map Leader Broker
- _stateConnections :: Map KafkaAddress (Pool Handle)
- _stateTopicMetadata :: Map TopicName TopicMetadata
- _stateAddresses :: NonEmpty KafkaAddress
- stateWaitTime :: Lens' KafkaState MaxWaitTime
- stateWaitSize :: Lens' KafkaState MinBytes
- stateTopicMetadata :: Lens' KafkaState (Map TopicName TopicMetadata)
- stateRequiredAcks :: Lens' KafkaState RequiredAcks
- stateRequestTimeout :: Lens' KafkaState Timeout
- stateName :: Lens' KafkaState KafkaString
- stateCorrelationId :: Lens' KafkaState CorrelationId
- stateConnections :: Lens' KafkaState (Map KafkaAddress (Pool Handle))
- stateBufferSize :: Lens' KafkaState MaxBytes
- stateBrokers :: Lens' KafkaState (Map Leader Broker)
- stateAddresses :: Lens' KafkaState (NonEmpty KafkaAddress)
- type Kafka m = (MonadState KafkaState m, MonadError KafkaClientError m, MonadIO m, MonadBaseControl IO m)
- type KafkaClientId = KafkaString
- data KafkaClientError
- data KafkaTime
- data PartitionAndLeader = PartitionAndLeader {}
- palTopic :: Lens' PartitionAndLeader TopicName
- palPartition :: Lens' PartitionAndLeader Partition
- palLeader :: Lens' PartitionAndLeader Leader
- data TopicAndPartition = TopicAndPartition {}
- data TopicAndMessage = TopicAndMessage {}
- tamTopic :: Lens' TopicAndMessage TopicName
- tamMessage :: Lens' TopicAndMessage Message
- tamPayload :: TopicAndMessage -> ByteString
- defaultCorrelationId :: CorrelationId
- defaultRequiredAcks :: RequiredAcks
- defaultRequestTimeout :: Timeout
- defaultMinBytes :: MinBytes
- defaultMaxBytes :: MaxBytes
- defaultMaxWaitTime :: MaxWaitTime
- mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState
- addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState
- runKafka :: KafkaState -> StateT KafkaState (ExceptT KafkaClientError IO) a -> IO (Either KafkaClientError a)
- tryKafka :: Kafka m => m a -> m a
- makeRequest :: Kafka m => Handle -> ReqResp (m a) -> m a
- metadata :: Kafka m => MetadataRequest -> m MetadataResponse
- metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse
- createTopic :: Kafka m => CreateTopicsRequest -> m CreateTopicsResponse
- createTopic' :: Kafka m => Handle -> CreateTopicsRequest -> m CreateTopicsResponse
- createTopicsRequest :: TopicName -> Partition -> ReplicationFactor -> [(Partition, Replicas)] -> [(KafkaString, Metadata)] -> CreateTopicsRequest
- getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker
- expect :: Kafka m => KafkaClientError -> (a -> Maybe b) -> a -> m b
- brokerPartitionInfo :: Kafka m => TopicName -> m (Set PartitionAndLeader)
- findMetadataOrElse :: Kafka m => [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> m a
- protocolTime :: KafkaTime -> Time
- updateMetadatas :: Kafka m => [TopicName] -> m ()
- updateMetadata :: Kafka m => TopicName -> m ()
- updateAllMetadata :: Kafka m => m ()
- withBrokerHandle :: Kafka m => Broker -> (Handle -> m a) -> m a
- withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a
- broker2address :: Broker -> KafkaAddress
- withAnyHandle :: Kafka m => (Handle -> m a) -> m a
- data PartitionOffsetRequestInfo = PartitionOffsetRequestInfo {}
- getLastOffset :: Kafka m => KafkaTime -> Partition -> TopicName -> m Offset
- getLastOffset' :: Kafka m => Handle -> KafkaTime -> Partition -> TopicName -> m Offset
- offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest
Documentation
type KafkaAddress = (Host, Port) Source #
data KafkaState Source #
KafkaState | |
|
Instances
type Kafka m = (MonadState KafkaState m, MonadError KafkaClientError m, MonadIO m, MonadBaseControl IO m) Source #
The core Kafka monad.
type KafkaClientId = KafkaString Source #
data KafkaClientError Source #
Errors given from the Kafka monad.
KafkaNoOffset | A response did not contain an offset. |
KafkaDeserializationError String | A value could not be deserialized correctly. |
KafkaInvalidBroker Leader | Could not find a cached broker for the found leader. |
KafkaFailedToFetchMetadata | |
KafkaIOException IOException |
Instances
An abstract form of Kafka's time. Used for querying offsets.
LatestTime | The latest time on the broker. |
EarliestTime | The earliest time on the broker. |
OtherTime Time | A specific time. |
Instances
Eq KafkaTime Source # | |
Generic KafkaTime Source # | |
type Rep KafkaTime Source # | |
Defined in Network.Kafka type Rep KafkaTime = D1 (MetaData "KafkaTime" "Network.Kafka" "milena-0.5.3.0-56TArgubqLBEBU4ve7tukD" False) (C1 (MetaCons "LatestTime" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "EarliestTime" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "OtherTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Time)))) |
data PartitionAndLeader Source #
Instances
data TopicAndPartition Source #
Instances
data TopicAndMessage Source #
A topic with a serializable message.
Instances
Eq TopicAndMessage Source # | |
Defined in Network.Kafka (==) :: TopicAndMessage -> TopicAndMessage -> Bool # (/=) :: TopicAndMessage -> TopicAndMessage -> Bool # | |
Show TopicAndMessage Source # | |
Defined in Network.Kafka showsPrec :: Int -> TopicAndMessage -> ShowS # show :: TopicAndMessage -> String # showList :: [TopicAndMessage] -> ShowS # | |
Generic TopicAndMessage Source # | |
Defined in Network.Kafka type Rep TopicAndMessage :: Type -> Type # from :: TopicAndMessage -> Rep TopicAndMessage x # to :: Rep TopicAndMessage x -> TopicAndMessage # | |
type Rep TopicAndMessage Source # | |
Defined in Network.Kafka type Rep TopicAndMessage = D1 (MetaData "TopicAndMessage" "Network.Kafka" "milena-0.5.3.0-56TArgubqLBEBU4ve7tukD" False) (C1 (MetaCons "TopicAndMessage" PrefixI True) (S1 (MetaSel (Just "_tamTopic") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 TopicName) :*: S1 (MetaSel (Just "_tamMessage") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Message))) |
tamPayload :: TopicAndMessage -> ByteString Source #
Get the bytes from the Kafka message, ignoring the topic.
Configuration
defaultCorrelationId :: CorrelationId Source #
Default: 0
defaultRequiredAcks :: RequiredAcks Source #
Default: 1
defaultRequestTimeout :: Timeout Source #
Default: 10000
defaultMinBytes :: MinBytes Source #
Default: 0
defaultMaxBytes :: MaxBytes Source #
Default: 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime Source #
Default: 0
mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState Source #
Create a consumer using default values.
addKafkaAddress :: KafkaAddress -> KafkaState -> KafkaState Source #
runKafka :: KafkaState -> StateT KafkaState (ExceptT KafkaClientError IO) a -> IO (Either KafkaClientError a) Source #
Run the underlying Kafka monad.
tryKafka :: Kafka m => m a -> m a Source #
Catch IOException
s and wrap them in KafkaIOException
s.
makeRequest :: Kafka m => Handle -> ReqResp (m a) -> m a Source #
Make a request, incrementing the _stateCorrelationId
.
metadata :: Kafka m => MetadataRequest -> m MetadataResponse Source #
Send a metadata request to any broker.
metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse Source #
Send a metadata request.
createTopic :: Kafka m => CreateTopicsRequest -> m CreateTopicsResponse Source #
createTopic' :: Kafka m => Handle -> CreateTopicsRequest -> m CreateTopicsResponse Source #
createTopicsRequest :: TopicName -> Partition -> ReplicationFactor -> [(Partition, Replicas)] -> [(KafkaString, Metadata)] -> CreateTopicsRequest Source #
brokerPartitionInfo :: Kafka m => TopicName -> m (Set PartitionAndLeader) Source #
Find a leader and partition for the topic.
findMetadataOrElse :: Kafka m => [TopicName] -> Getting (Maybe a) KafkaState (Maybe a) -> KafkaClientError -> m a Source #
protocolTime :: KafkaTime -> Time Source #
Convert an abstract time to a serializable protocol value.
updateMetadatas :: Kafka m => [TopicName] -> m () Source #
updateMetadata :: Kafka m => TopicName -> m () Source #
updateAllMetadata :: Kafka m => m () Source #
withBrokerHandle :: Kafka m => Broker -> (Handle -> m a) -> m a Source #
Execute a Kafka action with a Handle
for the given Broker
, updating
the connections cache if needed.
When the action throws an IOException
, it is caught and returned as a
KafkaIOException
in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOException
s and exceptions thrown by
throwError
from Except
.
withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a Source #
Execute a Kafka action with a Handle
for the given KafkaAddress
,
updating the connections cache if needed.
When the action throws an IOException
, it is caught and returned as a
KafkaIOException
in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOException
s and exceptions thrown by
throwError
from Except
.
broker2address :: Broker -> KafkaAddress Source #
withAnyHandle :: Kafka m => (Handle -> m a) -> m a Source #
Like withAddressHandle
, but round-robins the addresses in the KafkaState
.
When the action throws an IOException
, it is caught and returned as a
KafkaIOException
in the Kafka monad.
Note that when the given action throws an exception, any state changes will
be discarded. This includes both IOException
s and exceptions thrown by
throwError
from Except
.
Offsets
data PartitionOffsetRequestInfo Source #
Fields to construct an offset request, per topic and partition.
PartitionOffsetRequestInfo | |
|
getLastOffset :: Kafka m => KafkaTime -> Partition -> TopicName -> m Offset Source #
Get the first found offset.
getLastOffset' :: Kafka m => Handle -> KafkaTime -> Partition -> TopicName -> m Offset Source #
Get the first found offset.
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> OffsetRequest Source #
Create an offset request.