Safe Haskell | None |
---|---|
Language | Haskell2010 |
Kafka.Producer.Sync
Contents
Description
This module provides a synchronous interface on top of the hw-kafka-client
It works by using MVars managed in two different queues. Each request is sent as soon as there are no other effectively equal Kafka records in-flight. This is done in order to make sure that there is no ambiguity as to which MVar to resolve.
Currently, this implements fair sending. For all requests, the oldest pending request should be sent first.
Synopsis
- data SyncKafkaProducer
- newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer)
- closeSyncProducer :: MonadIO m => SyncKafkaProducer -> m ()
- produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ())
- data ProducerRecord = ProducerRecord {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data ProducePartition
- data KafkaError
- data ProducerProperties = ProducerProperties {
- ppKafkaProps :: Map Text Text
- ppTopicProps :: Map Text Text
- ppLogLevel :: Maybe KafkaLogLevel
- ppCallbacks :: [KafkaConf -> IO ()]
- brokersList :: [BrokerAddress] -> ProducerProperties
- logLevel :: KafkaLogLevel -> ProducerProperties
- compression :: KafkaCompressionCodec -> ProducerProperties
- topicCompression :: KafkaCompressionCodec -> ProducerProperties
- sendTimeout :: Timeout -> ProducerProperties
- extraProps :: Map Text Text -> ProducerProperties
- suppressDisconnectLogs :: ProducerProperties
- extraTopicProps :: Map Text Text -> ProducerProperties
- debugOptions :: [KafkaDebug] -> ProducerProperties
- newtype BrokerAddress = BrokerAddress {}
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaLogLevel
- newtype Timeout = Timeout {}
Sync producer
data SyncKafkaProducer Source #
A producer for sending messages to Kafka and waiting for the DeliveryReport
A single producer may be used for the entire application. The underlying
library, librdkafka
, deals very well with concurrent use - this
implementation supports that as well.
newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer) Source #
Create a new SyncKafkaProducer
Note: since this library wraps the regular hw-kafka-client, please be aware that you should not set the delivery report callback. As it is set internally.
closeSyncProducer :: MonadIO m => SyncKafkaProducer -> m () Source #
Close the SyncKafkaProducer
After invoking this function, the producer should not be used anymore
Ideally, you would use bracket
in order to make sure
that a producer is not re-used once closed.
produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ()) Source #
Synchronously produce a record using the specified producer
Re-exports
Record datatypes
data ProducerRecord #
Constructors
ProducerRecord | |
Fields
|
Instances
Constructors
TopicName | |
Fields
|
Instances
Eq TopicName | |
Ord TopicName | |
Read TopicName | |
Show TopicName | |
Generic TopicName | |
type Rep TopicName | |
Defined in Kafka.Types |
data ProducePartition #
Constructors
SpecifiedPartition !Int | |
UnassignedPartition |
Instances
Errors
data KafkaError #
Constructors
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
Instances
Producer configuration
data ProducerProperties #
Constructors
ProducerProperties | |
Fields
|
Instances
Semigroup ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties Methods (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties # | |
Monoid ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties Methods mempty :: ProducerProperties # mappend :: ProducerProperties -> ProducerProperties -> ProducerProperties # mconcat :: [ProducerProperties] -> ProducerProperties # |
Configuration helpers
brokersList :: [BrokerAddress] -> ProducerProperties #
Set brokers for producer
Set log-level for producer
Set compression level for producer
Set topic compression for producer
Set send timeout for producer
extraProps :: Map Text Text -> ProducerProperties #
Set extra properties for producer
Suppress disconnect log lines
Configure extra topic properties
debugOptions :: [KafkaDebug] -> ProducerProperties #
Add KafkaDebug
options
Other datatypes
newtype BrokerAddress #
Constructors
BrokerAddress | |
Fields |
Instances
Eq BrokerAddress | |
Defined in Kafka.Types Methods (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress | |
Defined in Kafka.Types Methods showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
Generic BrokerAddress | |
Defined in Kafka.Types Associated Types type Rep BrokerAddress :: Type -> Type # | |
type Rep BrokerAddress | |
Defined in Kafka.Types type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) |
data KafkaCompressionCodec #
Constructors
NoCompression | |
Gzip | |
Snappy | |
Lz4 |
Instances
data KafkaDebug #
Constructors
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
Instances
data KafkaLogLevel #
Constructors
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Instances
Enum KafkaLogLevel | |
Defined in Kafka.Types Methods succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel | |
Defined in Kafka.Types Methods (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel | |
Defined in Kafka.Types Methods showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |