{-# LANGUAGE FlexibleContexts #-}

module Network.AMQP.Streamly
  (
    -- * How to use this library
    -- $use
    SendInstructions(..)
  , produce
  , consume
  )
where

import           Control.Concurrent.MVar
import           Control.Monad.IO.Class         ( MonadIO
                                                , liftIO
                                                )

import           Data.Text                      ( Text )
import           Network.AMQP
import           Streamly
import qualified Streamly.Internal.Prelude     as S
import qualified Streamly.Prelude              as S

-- | Informations to be sent
--
-- See @Network.AMQP.publishMsg'@ for options
data SendInstructions = SendInstructions { exchange :: Text, routingKey :: Text, mandatory :: Bool, message :: Message } deriving (Show)

-- | The Queue name
type Queue = Text

-- | Publish the produced messages
produce
  :: (IsStream t, MonadAsync m) => Channel -> t m SendInstructions -> t m ()
produce channel = S.mapM send
 where
  send i = liftIO $ do
    publishMsg' channel (exchange i) (routingKey i) (mandatory i) (message i)
    return ()

-- | Stream messages from a queue
--
-- See @Network.AMQP.consumeMsgs@ for options
consume
  :: (IsStream t, MonadAsync m)
  => Channel
  -> Queue
  -> Ack
  -> t m (Message, Envelope)
consume channel queue ack = S.concatM $ liftIO $ do
  mvar <- newEmptyMVar
  consumeMsgs channel queue Ack $ putMVar mvar
  return $ S.repeatM $ taking mvar
 where
  taking :: MonadIO m => MVar (Message, Envelope) -> m (Message, Envelope)
  taking mvar = liftIO $ if ack == NoAck
    then do
      retrieved <- takeMVar mvar
      ackEnv $ snd retrieved
      return retrieved
    else takeMVar mvar


-- $use
--
-- This section contains basic step-by-step usage of the library.
--
-- You can either build a producer, which will publish all the messages of
-- a stream:
--
-- > Streamly.drain $ produce channel sendInstructionsStream
--
-- Or a consumer, which will contain the @Message@s and @Envelope@s of
-- a queue:
--
-- > Streamly.drain $ consume channel aQueue NoAck