{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} ----------------------------------------------------------------------------- -- | -- Module holding types shared by consumer and producer modules. ----------------------------------------------------------------------------- module Kafka.Types ( BrokerId(..) , PartitionId(..) , Millis(..) , ClientId(..) , BatchSize(..) , TopicName(..) , BrokerAddress(..) , Timeout(..) , KafkaLogLevel(..) , KafkaError(..) , KafkaDebug(..) , KafkaCompressionCodec(..) , TopicType(..) , Headers, headersFromList, headersToList , topicType , kafkaDebugToText , kafkaCompressionCodecToText ) where import Control.Exception (Exception (..)) import Data.Int (Int64) import Data.String (IsString) import Data.Text (Text, isPrefixOf) import Data.Typeable (Typeable) import GHC.Generics (Generic) import Kafka.Internal.RdKafka (RdKafkaRespErrT, rdKafkaErr2name, rdKafkaErr2str) import qualified Data.ByteString as BS -- | Kafka broker ID newtype BrokerId = BrokerId { unBrokerId :: Int } deriving (Show, Eq, Ord, Read, Generic) -- | Topic partition ID newtype PartitionId = PartitionId { unPartitionId :: Int } deriving (Show, Eq, Read, Ord, Enum, Generic) -- | A number of milliseconds, used to represent durations and timestamps newtype Millis = Millis { unMillis :: Int64 } deriving (Show, Read, Eq, Ord, Num, Generic) -- | Client ID used by Kafka to better track requests -- -- See newtype ClientId = ClientId { unClientId :: Text } deriving (Show, Eq, IsString, Ord, Generic) -- | Batch size used for polling newtype BatchSize = BatchSize { unBatchSize :: Int } deriving (Show, Read, Eq, Ord, Num, Generic) -- | Whether the topic is created by a user or by the system data TopicType = User -- ^ Normal topics that are created by user. | System -- ^ Topics starting with a double underscore "\__" (@__consumer_offsets@, @__confluent.support.metrics@, etc.) are considered "system" topics deriving (Show, Read, Eq, Ord, Generic) -- | Topic name to consume/produce messages -- -- Wildcard (regex) topics are supported by the /librdkafka/ assignor: -- any topic name in the topics list that is prefixed with @^@ will -- be regex-matched to the full list of topics in the cluster and matching -- topics will be added to the subscription list. newtype TopicName = TopicName { unTopicName :: Text -- ^ a simple topic name or a regex if started with @^@ } deriving (Show, Eq, Ord, IsString, Read, Generic) -- | Deduce the type of a topic from its name, by checking if it starts with a double underscore "\__" topicType :: TopicName -> TopicType topicType (TopicName tn) = if "__" `isPrefixOf` tn then System else User {-# INLINE topicType #-} -- | Kafka broker address string (e.g. @broker1:9092@) newtype BrokerAddress = BrokerAddress { unBrokerAddress :: Text } deriving (Show, Eq, IsString, Generic) -- | Timeout in milliseconds newtype Timeout = Timeout { unTimeout :: Int } deriving (Show, Eq, Read, Generic) -- | Log levels for /librdkafka/. data KafkaLogLevel = KafkaLogEmerg | KafkaLogAlert | KafkaLogCrit | KafkaLogErr | KafkaLogWarning | KafkaLogNotice | KafkaLogInfo | KafkaLogDebug deriving (Show, Enum, Eq) -- | All possible Kafka errors data KafkaError = KafkaError Text | KafkaInvalidReturnValue | KafkaBadSpecification Text | KafkaResponseError RdKafkaRespErrT | KafkaInvalidConfigurationValue Text | KafkaUnknownConfigurationKey Text | KafkaBadConfiguration deriving (Eq, Show, Typeable, Generic) instance Exception KafkaError where displayException (KafkaResponseError err) = "[" ++ rdKafkaErr2name err ++ "] " ++ rdKafkaErr2str err displayException err = show err -- | Available /librdkafka/ debug contexts data KafkaDebug = DebugGeneric | DebugBroker | DebugTopic | DebugMetadata | DebugQueue | DebugMsg | DebugProtocol | DebugCgrp | DebugSecurity | DebugFetch | DebugFeature | DebugAll deriving (Eq, Show, Typeable, Generic) -- | Convert a 'KafkaDebug' into its /librdkafka/ string equivalent. -- -- This is used internally by the library but may be useful to some developers. kafkaDebugToText :: KafkaDebug -> Text kafkaDebugToText d = case d of DebugGeneric -> "generic" DebugBroker -> "broker" DebugTopic -> "topic" DebugMetadata -> "metadata" DebugQueue -> "queue" DebugMsg -> "msg" DebugProtocol -> "protocol" DebugCgrp -> "cgrp" DebugSecurity -> "security" DebugFetch -> "fetch" DebugFeature -> "feature" DebugAll -> "all" -- | Compression codec used by a topic -- -- See data KafkaCompressionCodec = NoCompression | Gzip | Snappy | Lz4 | Zstd deriving (Eq, Show, Typeable, Generic) -- | Convert a 'KafkaCompressionCodec' into its /librdkafka/ string equivalent. -- -- This is used internally by the library but may be useful to some developers. kafkaCompressionCodecToText :: KafkaCompressionCodec -> Text kafkaCompressionCodecToText c = case c of NoCompression -> "none" Gzip -> "gzip" Snappy -> "snappy" Lz4 -> "lz4" Zstd -> "zstd" -- | Headers that might be passed along with a record newtype Headers = Headers { unHeaders :: [(BS.ByteString, BS.ByteString)] } deriving (Eq, Show, Semigroup, Monoid, Read, Typeable, Generic) headersFromList :: [(BS.ByteString, BS.ByteString)] -> Headers headersFromList = Headers headersToList :: Headers -> [(BS.ByteString, BS.ByteString)] headersToList = unHeaders