{-# LANGUAGE FlexibleContexts #-}
module Network.AMQP.Streamly
(
SendInstructions (..),
produce,
consume,
)
where
import Control.Concurrent.MVar
import Control.Monad.IO.Class
( MonadIO,
liftIO,
)
import Data.Text (Text)
import Network.AMQP
import Streamly.Data.Stream
import qualified Streamly.Data.Stream as S
import Streamly.Data.Stream.Prelude
import qualified Streamly.Data.Stream.Prelude as S
data SendInstructions = SendInstructions {SendInstructions -> Text
exchange :: Text, SendInstructions -> Text
routingKey :: Text, SendInstructions -> Bool
mandatory :: Bool, SendInstructions -> Message
message :: Message} deriving (Int -> SendInstructions -> ShowS
[SendInstructions] -> ShowS
SendInstructions -> String
(Int -> SendInstructions -> ShowS)
-> (SendInstructions -> String)
-> ([SendInstructions] -> ShowS)
-> Show SendInstructions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SendInstructions -> ShowS
showsPrec :: Int -> SendInstructions -> ShowS
$cshow :: SendInstructions -> String
show :: SendInstructions -> String
$cshowList :: [SendInstructions] -> ShowS
showList :: [SendInstructions] -> ShowS
Show)
type Queue = Text
produce ::
(MonadIO m) =>
Channel ->
Stream m SendInstructions ->
Stream m ()
produce :: forall (m :: * -> *).
MonadIO m =>
Channel -> Stream m SendInstructions -> Stream m ()
produce Channel
channel = (SendInstructions -> m ())
-> Stream m SendInstructions -> Stream m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
S.mapM SendInstructions -> m ()
forall {m :: * -> *}. MonadIO m => SendInstructions -> m ()
send
where
send :: SendInstructions -> m ()
send SendInstructions
i = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
channel (SendInstructions -> Text
exchange SendInstructions
i) (SendInstructions -> Text
routingKey SendInstructions
i) (SendInstructions -> Bool
mandatory SendInstructions
i) (SendInstructions -> Message
message SendInstructions
i)
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
consume ::
(MonadIO m) =>
Channel ->
Queue ->
Ack ->
Stream m (Message, Envelope)
consume :: forall (m :: * -> *).
MonadIO m =>
Channel -> Text -> Ack -> Stream m (Message, Envelope)
consume Channel
channel Text
queue Ack
ack = m (Stream m (Message, Envelope)) -> Stream m (Message, Envelope)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
S.concatEffect (m (Stream m (Message, Envelope)) -> Stream m (Message, Envelope))
-> m (Stream m (Message, Envelope)) -> Stream m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ IO (Stream m (Message, Envelope))
-> m (Stream m (Message, Envelope))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream m (Message, Envelope))
-> m (Stream m (Message, Envelope)))
-> IO (Stream m (Message, Envelope))
-> m (Stream m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ do
MVar (Message, Envelope)
mvar <- IO (MVar (Message, Envelope))
forall a. IO (MVar a)
newEmptyMVar
Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO Text
consumeMsgs Channel
channel Text
queue Ack
Ack (((Message, Envelope) -> IO ()) -> IO Text)
-> ((Message, Envelope) -> IO ()) -> IO Text
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> (Message, Envelope) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Message, Envelope)
mvar
Stream m (Message, Envelope) -> IO (Stream m (Message, Envelope))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (Message, Envelope) -> IO (Stream m (Message, Envelope)))
-> Stream m (Message, Envelope)
-> IO (Stream m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ m (Message, Envelope) -> Stream m (Message, Envelope)
forall (m :: * -> *) a. Monad m => m a -> Stream m a
S.repeatM (m (Message, Envelope) -> Stream m (Message, Envelope))
-> m (Message, Envelope) -> Stream m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> m (Message, Envelope)
forall (m :: * -> *).
MonadIO m =>
MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar
where
taking :: (MonadIO m) => MVar (Message, Envelope) -> m (Message, Envelope)
taking :: forall (m :: * -> *).
MonadIO m =>
MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar =
IO (Message, Envelope) -> m (Message, Envelope)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Message, Envelope) -> m (Message, Envelope))
-> IO (Message, Envelope) -> m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$
if Ack
ack Ack -> Ack -> Bool
forall a. Eq a => a -> a -> Bool
== Ack
NoAck
then do
(Message, Envelope)
retrieved <- MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar
Envelope -> IO ()
ackEnv (Envelope -> IO ()) -> Envelope -> IO ()
forall a b. (a -> b) -> a -> b
$ (Message, Envelope) -> Envelope
forall a b. (a, b) -> b
snd (Message, Envelope)
retrieved
(Message, Envelope) -> IO (Message, Envelope)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Message, Envelope)
retrieved
else MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar