{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
module Network.UI.Kafka (
TopicConnection(..)
, Sensor
, LoopAction
, ExitAction
, ConsumerCallback
, consumerLoop
, ProducerCallback
, producerLoop
) where
import Control.Arrow ((***))
import Control.Concurrent (MVar, newEmptyMVar, isEmptyMVar, threadDelay, tryPutMVar)
import Control.Monad (void, when)
import Control.Monad.Except (liftIO)
import Data.Aeson.Types (FromJSON, ToJSON)
import Data.Binary (decode, encode)
import Data.ByteString.Char8 (pack, unpack)
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.String (IsString(fromString))
import GHC.Generics (Generic)
import Network.UI.Kafka.Types (Event)
import Network.Kafka (KafkaClientError, KafkaTime(..), TopicAndMessage(..), getLastOffset, mkKafkaState, runKafka, withAddressHandle)
import Network.Kafka.Consumer (fetch', fetchMessages, fetchRequest)
import Network.Kafka.Producer (makeKeyedMessage, produceMessages)
import Network.Kafka.Protocol (FetchResponse(..), KafkaBytes(..), Key(..), Message(..), Value(..))
data TopicConnection =
TopicConnection
{
client :: String
, address :: (String, Int)
, topic :: String
}
deriving (Eq, Generic, Read, Show)
instance FromJSON TopicConnection
instance ToJSON TopicConnection
type Sensor = String
type LoopAction = IO (Either KafkaClientError ())
type ExitAction = IO ()
type ConsumerCallback = Sensor
-> Event
-> IO ()
consumerLoop :: TopicConnection
-> ConsumerCallback
-> IO (ExitAction, LoopAction)
consumerLoop TopicConnection{..} consumer =
do
exitFlag <- newEmptyMVar :: IO (MVar ())
let
topic' = fromString topic
address' = fromString *** fromIntegral $ address
fromMessage :: Message -> (Sensor, Event)
fromMessage message =
let
(_, _, _, Key (Just (KBytes k)), Value (Just (KBytes v))) = _messageFields message
in
(unpack k, decode $ fromStrict v)
loop offset =
do
result <- withAddressHandle address' $ \handle -> fetch' handle =<< fetchRequest offset 0 topic'
let
(_, [(_, _, offset', _)]) : _ = _fetchResponseFields result
messages = fromMessage . _tamMessage <$> fetchMessages result
liftIO
$ do
mapM_ (uncurry consumer) messages
threadDelay 100
running <- liftIO $ isEmptyMVar exitFlag
when running
$ loop offset'
return
(
void $ tryPutMVar exitFlag ()
, fmap void $ runKafka (mkKafkaState (fromString client) address')
$ do
offset <- getLastOffset LatestTime 0 topic'
loop offset
)
type ProducerCallback = IO [Event]
producerLoop :: TopicConnection
-> Sensor
-> ProducerCallback
-> IO (ExitAction, LoopAction)
producerLoop TopicConnection{..} sensor producer =
do
exitFlag <- newEmptyMVar :: IO (MVar ())
let
loop =
do
events <- liftIO producer
void
$ produceMessages
$ map
(TopicAndMessage (fromString topic) . makeKeyedMessage (pack sensor) . toStrict . encode)
events
running <- liftIO $ isEmptyMVar exitFlag
when running
loop
return
(
void $ tryPutMVar exitFlag ()
, void <$> runKafka (mkKafkaState (fromString client) (fromString *** fromIntegral $ address)) loop
)