Safe Haskell | None |
---|---|
Language | Haskell2010 |
This module provides a synchronous interface on top of the hw-kafka-client
It works by using MVars managed in two different queues. Each request is sent as soon as there are no other effectively equal Kafka records in-flight. This is done in order to make sure that there is no ambiguity as to which MVar to resolve.
Currently, this implements fair sending. For all requests, the oldest pending request should be sent first.
Synopsis
- data SyncKafkaProducer
- newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer)
- closeSyncProducer :: MonadIO m => SyncKafkaProducer -> m ()
- produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ())
- data ProducerRecord = ProducerRecord {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data ProducePartition
- data KafkaError
- data ProducerProperties = ProducerProperties {
- ppKafkaProps :: Map Text Text
- ppTopicProps :: Map Text Text
- ppLogLevel :: Maybe KafkaLogLevel
- ppCallbacks :: [KafkaConf -> IO ()]
- brokersList :: [BrokerAddress] -> ProducerProperties
- logLevel :: KafkaLogLevel -> ProducerProperties
- compression :: KafkaCompressionCodec -> ProducerProperties
- topicCompression :: KafkaCompressionCodec -> ProducerProperties
- sendTimeout :: Timeout -> ProducerProperties
- extraProps :: Map Text Text -> ProducerProperties
- suppressDisconnectLogs :: ProducerProperties
- extraTopicProps :: Map Text Text -> ProducerProperties
- debugOptions :: [KafkaDebug] -> ProducerProperties
- newtype BrokerAddress = BrokerAddress {}
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaLogLevel
- newtype Timeout = Timeout {}
Sync producer
data SyncKafkaProducer Source #
A producer for sending messages to Kafka and waiting for the DeliveryReport
A single producer may be used for the entire application. The underlying
library, librdkafka
, deals very well with concurrent use - this
implementation supports that as well.
newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer) Source #
Create a new SyncKafkaProducer
Note: since this library wraps the regular hw-kafka-client, please be aware that you should not set the delivery report callback. As it is set internally.
closeSyncProducer :: MonadIO m => SyncKafkaProducer -> m () Source #
Close the SyncKafkaProducer
After invoking this function, the producer should not be used anymore
Ideally, you would use bracket
in order to make sure
that a producer is not re-used once closed.
produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ()) Source #
Synchronously produce a record using the specified producer
Re-exports
Record datatypes
data ProducerRecord #
ProducerRecord | |
|
Instances
data ProducePartition #
Instances
Errors
data KafkaError #
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
Instances
Producer configuration
data ProducerProperties #
ProducerProperties | |
|
Instances
Semigroup ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties # | |
Monoid ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties |
Configuration helpers
brokersList :: [BrokerAddress] -> ProducerProperties #
Set brokers for producer
Set log-level for producer
Set compression level for producer
Set topic compression for producer
Set send timeout for producer
extraProps :: Map Text Text -> ProducerProperties #
Set extra properties for producer
Suppress disconnect log lines
Configure extra topic properties
debugOptions :: [KafkaDebug] -> ProducerProperties #
Add KafkaDebug
options
Other datatypes
newtype BrokerAddress #
Instances
Eq BrokerAddress | |
Defined in Kafka.Types (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress | |
Defined in Kafka.Types showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
Generic BrokerAddress | |
Defined in Kafka.Types type Rep BrokerAddress :: Type -> Type # from :: BrokerAddress -> Rep BrokerAddress x # to :: Rep BrokerAddress x -> BrokerAddress # | |
type Rep BrokerAddress | |
Defined in Kafka.Types type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) |
data KafkaCompressionCodec #
Instances
Eq KafkaCompressionCodec | |
Defined in Kafka.Types (==) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # (/=) :: KafkaCompressionCodec -> KafkaCompressionCodec -> Bool # | |
Show KafkaCompressionCodec | |
Defined in Kafka.Types showsPrec :: Int -> KafkaCompressionCodec -> ShowS # show :: KafkaCompressionCodec -> String # showList :: [KafkaCompressionCodec] -> ShowS # | |
Generic KafkaCompressionCodec | |
Defined in Kafka.Types type Rep KafkaCompressionCodec :: Type -> Type # | |
type Rep KafkaCompressionCodec | |
Defined in Kafka.Types type Rep KafkaCompressionCodec = D1 (MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" False) ((C1 (MetaCons "NoCompression" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Gzip" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Snappy" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Lz4" PrefixI False) (U1 :: Type -> Type))) |
data KafkaDebug #
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
Instances
data KafkaLogLevel #
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Instances
Enum KafkaLogLevel | |
Defined in Kafka.Types succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel | |
Defined in Kafka.Types (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel | |
Defined in Kafka.Types showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |