module Network.Kafka.Producer where
import Data.Bits ((.&.))
import Data.ByteString.Char8 (ByteString)
import qualified Data.Digest.Murmur32 as Murmur32
import Control.Applicative
import Control.Lens
import Control.Monad.Trans (liftIO)
import Data.Monoid ((<>))
import Data.Set (Set)
import qualified Data.Set as Set
import System.IO
import qualified Data.Map as M
import System.Random (getStdRandom, randomR)
import Prelude
import Network.Kafka
import Network.Kafka.Protocol
produce :: Kafka m => Handle -> ProduceRequest -> m ProduceResponse
produce handle request = makeRequest handle $ ProduceRR request
produceRequest :: RequiredAcks -> Timeout -> [(TopicAndPartition, MessageSet)] -> ProduceRequest
produceRequest ra ti ts =
ProduceReq (ra, ti, M.toList . M.unionsWith (<>) $ fmap f ts)
where f (TopicAndPartition t p, i) = M.singleton t [(p, i)]
produceMessages :: Kafka m => [TopicAndMessage] -> m [ProduceResponse]
produceMessages = prod (groupMessagesToSet NoCompression)
produceCompressedMessages :: Kafka m => CompressionCodec -> [TopicAndMessage] -> m [ProduceResponse]
produceCompressedMessages c = prod (groupMessagesToSet c)
prod :: Kafka m => ([TopicAndMessage] -> MessageSet) -> [TopicAndMessage] -> m [ProduceResponse]
prod g tams = do
m <- fmap (fmap g) <$> partitionAndCollate tams
mapM (uncurry send) $ fmap M.toList <$> M.toList m
groupMessagesToSet :: CompressionCodec -> [TopicAndMessage] -> MessageSet
groupMessagesToSet c xs = MessageSet c $ msm <$> xs
where msm = MessageSetMember (Offset (-1)) . _tamMessage
partitionAndCollate :: Kafka m => [TopicAndMessage] -> m (M.Map Leader (M.Map TopicAndPartition [TopicAndMessage]))
partitionAndCollate ks = recurse ks M.empty
where recurse [] accum = return accum
recurse (x:xs) accum = do
topicPartitionsList <- brokerPartitionInfo $ _tamTopic x
let maybeKey = x ^. tamMessage . messageKey . keyBytes
pal <- case maybeKey of
Nothing -> getRandPartition topicPartitionsList
Just key -> return $ getPartitionByKey (_kafkaByteString key) topicPartitionsList
let leader = maybe (Leader Nothing) _palLeader pal
tp = TopicAndPartition <$> pal ^? folded . palTopic <*> pal ^? folded . palPartition
b = M.singleton leader $ maybe M.empty (`M.singleton` [x]) tp
accum' = M.unionWith (M.unionWith (<>)) accum b
recurse xs accum'
getPartitionByKey :: ByteString -> Set PartitionAndLeader -> Maybe PartitionAndLeader
getPartitionByKey key ps = Set.toAscList ps ^? ix i
where murmur = Murmur32.asWord32 . Murmur32.hash32WithSeed 0x9747b28c
toPositive = (.&. 0x7fffffff)
numPartitions = length ps
x = fromIntegral $ toPositive $ murmur key
i = x `mod` numPartitions
send :: Kafka m => Leader -> [(TopicAndPartition, MessageSet)] -> m ProduceResponse
send l ts = do
let s = stateBrokers . at l
topicNames = map (_tapTopic . fst) ts
broker <- findMetadataOrElse topicNames s (KafkaInvalidBroker l)
requiredAcks <- use stateRequiredAcks
requestTimeout <- use stateRequestTimeout
withBrokerHandle broker $ \handle -> produce handle $ produceRequest requiredAcks requestTimeout ts
getRandPartition :: Kafka m => Set PartitionAndLeader -> m (Maybe PartitionAndLeader)
getRandPartition ps =
liftIO $ (ps' ^?) . element <$> getStdRandom (randomR (0, length ps' - 1))
where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just)
defaultMessageCrc :: Crc
defaultMessageCrc = 1
defaultMessageMagicByte :: MagicByte
defaultMessageMagicByte = 0
defaultMessageKey :: Key
defaultMessageKey = Key Nothing
defaultMessageAttributes :: Attributes
defaultMessageAttributes = Attributes NoCompression
makeMessage :: ByteString -> Message
makeMessage m = Message (defaultMessageCrc, defaultMessageMagicByte, defaultMessageAttributes, defaultMessageKey, Value (Just (KBytes m)))
makeKeyedMessage :: ByteString -> ByteString -> Message
makeKeyedMessage k m = Message (defaultMessageCrc, defaultMessageMagicByte, defaultMessageAttributes, Key (Just (KBytes k)), Value (Just (KBytes m)))