module Kafka.Producer.Sync
(
SyncKafkaProducer
, newSyncProducer
, closeSyncProducer
, produceRecord
, ProducerRecord(..)
, TopicName(..)
, ProducePartition(..)
, KafkaError(..)
, ProducerProperties(..)
, KP.brokersList
, KP.logLevel
, KP.compression
, KP.topicCompression
, KP.sendTimeout
, KP.extraProps
, KP.suppressDisconnectLogs
, KP.extraTopicProps
, KP.debugOptions
, BrokerAddress(..)
, KafkaCompressionCodec(..)
, KafkaDebug(..)
, KafkaLogLevel(..)
, Timeout(..)
)
where
import Prelude
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, newMVar, takeMVar, newEmptyMVar, putMVar)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Foldable (find)
import Data.Functor ((<&>))
import Data.Maybe (isJust)
import Data.Sequence (Seq(..), (<|), (|>))
import qualified Kafka.Producer as KP (deliveryCallback, flushProducer, newProducer)
import qualified Kafka.Producer as KP (closeProducer, produceMessage, setCallback)
import Kafka.Producer.ProducerProperties (ProducerProperties(..))
import qualified Kafka.Producer.ProducerProperties as KP (brokersList, logLevel, compression, topicCompression)
import qualified Kafka.Producer.ProducerProperties as KP (sendTimeout, extraProps, suppressDisconnectLogs)
import qualified Kafka.Producer.ProducerProperties as KP (extraTopicProps, debugOptions)
import Kafka.Producer.Types (KafkaProducer, ProducerRecord(..))
import Kafka.Producer.Types (DeliveryReport(..), ProducePartition(..))
import Kafka.Types (KafkaLogLevel(..), KafkaError(..), TopicName(..), Timeout(..))
import Kafka.Types (KafkaDebug(..), BrokerAddress(..), KafkaCompressionCodec(..))
produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ())
produceRecord syncProducer record =
liftIO $ sendProducerRecord record syncProducer <* sendPending syncProducer
data SyncKafkaProducer = SyncKafkaProducer
{ requests :: MVar Requests
, producer :: KafkaProducer
}
type ResultVar = MVar (Either KafkaError ())
data Requests = Requests
{ pending :: Seq (ResultVar, ProducerRecord)
, sent :: Seq (ResultVar, ProducerRecord)
}
instance Show Requests where
show Requests{..} =
"Requests { pending = " <> show (snd <$> pending) <> ", sent = " <> show (snd <$> sent) <> " }"
newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer)
newSyncProducer props = liftIO $ do
reqs <- newMVar Requests { pending = mempty, sent = mempty }
let
callbackAction =
handleDeliveryReport reqs
producer =
KP.newProducer $ props <> KP.setCallback (KP.deliveryCallback callbackAction)
producer <&> fmap (SyncKafkaProducer reqs)
closeSyncProducer :: MonadIO m => SyncKafkaProducer -> m ()
closeSyncProducer SyncKafkaProducer{..} = KP.closeProducer producer
sendPending :: SyncKafkaProducer -> IO ()
sendPending SyncKafkaProducer{..} = do
reqs <- takeMVar requests
case pending reqs of
(mvar, rec) :<| rest -> do
KP.produceMessage producer rec >>= \case
Just err -> putMVar mvar . Left $ err
Nothing -> pure ()
putMVar requests reqs { pending = rest, sent = sent reqs |> (mvar, rec) }
KP.flushProducer producer
Empty ->
putMVar requests reqs
sendProducerRecord :: ProducerRecord -> SyncKafkaProducer -> IO (Either KafkaError ())
sendProducerRecord record SyncKafkaProducer{..} =
takeMVar requests >>= \reqs ->
if hasEffectivelyEqual record (sent reqs) then do
var <- newEmptyMVar
putMVar requests reqs { pending = pending reqs |> (var, record) }
takeMVar var
else KP.produceMessage producer record >>= \case
Just err -> do
putMVar requests reqs
pure (Left err)
Nothing -> do
var <- newEmptyMVar
putMVar requests reqs { sent = sent reqs |> (var, record) }
KP.flushProducer producer
takeMVar var
hasEffectivelyEqual :: ProducerRecord -> Seq (a, ProducerRecord) -> Bool
hasEffectivelyEqual record
= isJust
. find (effectivelyEqual record)
. fmap snd
handleDeliveryReport :: MVar Requests -> (DeliveryReport -> IO ())
handleDeliveryReport mvarRequests = \case
DeliverySuccess record _offset -> void . forkIO $ do
reqs <- takeMVar mvarRequests
case getAndRemove record (sent reqs) of
Just (mvar, rest) -> do
putMVar mvarRequests reqs { sent = rest }
putMVar mvar $ Right ()
Nothing ->
error
$ "Illegal state ocurred, record was not in sent: "
<> show reqs
DeliveryFailure record err -> void . forkIO $ do
reqs <- takeMVar mvarRequests
case getAndRemove record (sent reqs) of
Just (mvar, rest) -> do
putMVar mvarRequests reqs { sent = rest }
putMVar mvar . Left $ err
Nothing ->
error
$ "Illegal state ocurred, record was not in sent: "
<> show reqs
NoMessageError err ->
error $ "Illegal state ocurred, NoMessageError received: " <> show err
getAndRemove ::
ProducerRecord
-> Seq (ResultVar, ProducerRecord)
-> Maybe (ResultVar, Seq (ResultVar, ProducerRecord))
getAndRemove record xs =
let
splitRight acc = \case
rest :|> current ->
if snd current `effectivelyEqual` record then
Just (fst current, rest <> acc)
else
splitRight (current <| acc) rest
Empty -> Nothing
in
splitRight Empty xs
effectivelyEqual :: ProducerRecord -> ProducerRecord -> Bool
effectivelyEqual this other =
prTopic this == prTopic other &&
prKey this == prKey other &&
prValue this == prValue other