module Network.Kafka.Consumer where
import Control.Applicative
import Control.Lens
import System.IO
import Prelude
import Network.Kafka
import Network.Kafka.Protocol
ordinaryConsumerId :: ReplicaId
ordinaryConsumerId = ReplicaId (-1)
fetchRequest :: Kafka m => Offset -> Partition -> TopicName -> m FetchRequest
fetchRequest o p topic = do
wt <- use stateWaitTime
ws <- use stateWaitSize
bs <- use stateBufferSize
return $ FetchReq (ordinaryConsumerId, wt, ws, [(topic, [(p, o, bs)])])
fetch' :: Kafka m => Handle -> FetchRequest -> m FetchResponse
fetch' h request = makeRequest h $ FetchRR request
fetch :: Kafka m => Offset -> Partition -> TopicName -> m FetchResponse
fetch o p topic = do
broker <- getTopicPartitionLeader topic p
withBrokerHandle broker (\handle -> fetch' handle =<< fetchRequest o p topic)
fetchMessages :: FetchResponse -> [TopicAndMessage]
fetchMessages fr = (fr ^.. fetchResponseFields . folded) >>= tam
where tam a = TopicAndMessage (a ^. _1) <$> a ^.. _2 . folded . _4 . messageSetMembers . folded . setMessage