{-# LANGUAGE FlexibleContexts #-}

module Network.AMQP.Streamly
  ( -- * How to use this library
    -- $use
    SendInstructions (..),
    produce,
    consume,
  )
where

import Control.Concurrent.MVar
import Control.Monad.IO.Class
  ( MonadIO,
    liftIO,
  )
import Data.Text (Text)
import Network.AMQP
import Streamly.Data.Stream
import qualified Streamly.Data.Stream as S
import Streamly.Data.Stream.Prelude
import qualified Streamly.Data.Stream.Prelude as S

-- | Informations to be sent
--
-- See @Network.AMQP.publishMsg'@ for options
data SendInstructions = SendInstructions {SendInstructions -> Text
exchange :: Text, SendInstructions -> Text
routingKey :: Text, SendInstructions -> Bool
mandatory :: Bool, SendInstructions -> Message
message :: Message} deriving (Int -> SendInstructions -> ShowS
[SendInstructions] -> ShowS
SendInstructions -> String
(Int -> SendInstructions -> ShowS)
-> (SendInstructions -> String)
-> ([SendInstructions] -> ShowS)
-> Show SendInstructions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SendInstructions -> ShowS
showsPrec :: Int -> SendInstructions -> ShowS
$cshow :: SendInstructions -> String
show :: SendInstructions -> String
$cshowList :: [SendInstructions] -> ShowS
showList :: [SendInstructions] -> ShowS
Show)

-- | The Queue name
type Queue = Text

-- | Publish the produced messages
produce ::
  (MonadIO m) =>
  Channel ->
  Stream m SendInstructions ->
  Stream m ()
produce :: forall (m :: * -> *).
MonadIO m =>
Channel -> Stream m SendInstructions -> Stream m ()
produce Channel
channel = (SendInstructions -> m ())
-> Stream m SendInstructions -> Stream m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
S.mapM SendInstructions -> m ()
forall {m :: * -> *}. MonadIO m => SendInstructions -> m ()
send
  where
    send :: SendInstructions -> m ()
send SendInstructions
i = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
channel (SendInstructions -> Text
exchange SendInstructions
i) (SendInstructions -> Text
routingKey SendInstructions
i) (SendInstructions -> Bool
mandatory SendInstructions
i) (SendInstructions -> Message
message SendInstructions
i)
      () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Stream messages from a queue
--
-- See @Network.AMQP.consumeMsgs@ for options
consume ::
  (MonadIO m) =>
  Channel ->
  Queue ->
  Ack ->
  Stream m (Message, Envelope)
consume :: forall (m :: * -> *).
MonadIO m =>
Channel -> Text -> Ack -> Stream m (Message, Envelope)
consume Channel
channel Text
queue Ack
ack = m (Stream m (Message, Envelope)) -> Stream m (Message, Envelope)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
S.concatEffect (m (Stream m (Message, Envelope)) -> Stream m (Message, Envelope))
-> m (Stream m (Message, Envelope)) -> Stream m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ IO (Stream m (Message, Envelope))
-> m (Stream m (Message, Envelope))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream m (Message, Envelope))
 -> m (Stream m (Message, Envelope)))
-> IO (Stream m (Message, Envelope))
-> m (Stream m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ do
  MVar (Message, Envelope)
mvar <- IO (MVar (Message, Envelope))
forall a. IO (MVar a)
newEmptyMVar
  Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO Text
consumeMsgs Channel
channel Text
queue Ack
Ack (((Message, Envelope) -> IO ()) -> IO Text)
-> ((Message, Envelope) -> IO ()) -> IO Text
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> (Message, Envelope) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Message, Envelope)
mvar
  Stream m (Message, Envelope) -> IO (Stream m (Message, Envelope))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (Message, Envelope) -> IO (Stream m (Message, Envelope)))
-> Stream m (Message, Envelope)
-> IO (Stream m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ m (Message, Envelope) -> Stream m (Message, Envelope)
forall (m :: * -> *) a. Monad m => m a -> Stream m a
S.repeatM (m (Message, Envelope) -> Stream m (Message, Envelope))
-> m (Message, Envelope) -> Stream m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> m (Message, Envelope)
forall (m :: * -> *).
MonadIO m =>
MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar
  where
    taking :: (MonadIO m) => MVar (Message, Envelope) -> m (Message, Envelope)
    taking :: forall (m :: * -> *).
MonadIO m =>
MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar =
      IO (Message, Envelope) -> m (Message, Envelope)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Message, Envelope) -> m (Message, Envelope))
-> IO (Message, Envelope) -> m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$
        if Ack
ack Ack -> Ack -> Bool
forall a. Eq a => a -> a -> Bool
== Ack
NoAck
          then do
            (Message, Envelope)
retrieved <- MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar
            Envelope -> IO ()
ackEnv (Envelope -> IO ()) -> Envelope -> IO ()
forall a b. (a -> b) -> a -> b
$ (Message, Envelope) -> Envelope
forall a b. (a, b) -> b
snd (Message, Envelope)
retrieved
            (Message, Envelope) -> IO (Message, Envelope)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Message, Envelope)
retrieved
          else MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar

-- $use
--
-- This section contains basic step-by-step usage of the library.
--
-- You can either build a producer, which will publish all the messages of
-- a stream:
--
-- > Streamly.drain $ produce channel sendInstructionsStream
--
-- Or a consumer, which will contain the @Message@s and @Envelope@s of
-- a queue:
--
-- > Streamly.drain $ consume channel aQueue NoAck