Safe Haskell | None |
---|---|
Language | Haskell98 |
Conduit bindings for AMQP (see amqp package) https://hackage.haskell.org/package/amqp
Create a AMQP connection, a channel, declare a queue and an exchange and run the given action.
Example:
Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange
{-# LANGUAGE OverloadedStrings #-} import Control.Concurrent (threadDelay) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Resource (runResourceT) import qualified Data.ByteString.Lazy.Char8 as BL import Data.Conduit import Network.AMQP import Network.AMQP.Conduit import Test.Hspec main :: IO () main = hspec $ do describe "produce and consume test" $ do it "send a message and recieve the message" $ do runResourceT $ withChannel config $ \conn -> do sendMsg str $$ amqpSendSink conn "myExchange" "myKey" amqp <- createChannel config amqp' <- createConsumer amqp "myQueue" Ack $ \(msg,env) -> do amqpReceiveSource (msg,env) $$ printMsg -- | NOTE: RabbitMQ 1.7 doesn't implement this command. -- amqp'' <- pauseConsumers amqp' -- amqp''' <- resumeConsumers amqp'' threadDelay $ 15 * 1000000 _ <- deleteConsumer amqp' return () str :: String str = "This is a test message" config :: AmqpConf config = AmqpConf "amqp://guest:guest@localhost:5672/" queue exchange "myKey" where exchange = newExchange {exchangeName = "myExchange", exchangeType = "direct"} queue = newQueue {queueName = "myQueue"} sendMsg :: (Monad m, MonadIO m) => String -> Source m Message sendMsg msg = do yield (newMsg {msgBody = (BL.pack msg),msgDeliveryMode = Just Persistent} ) printMsg :: (Monad m, MonadIO m) => Sink (Message, Envelope) m () printMsg = do m <- await case m of Nothing -> printMsg Just (msg,env) -> do liftIO $ ackEnv env liftIO $ (BL.unpack $ msgBody msg) `shouldBe` str liftIO $ putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg) -
- amqpReceiveSource :: MonadIO m => (Message, Envelope) -> Source m (Message, Envelope)
- amqpSendSink :: MonadIO m => AmqpConn -> Exchange -> ExchangeKey -> Sink Message m ()
- data AmqpConf = AmqpConf {}
- data AmqpConn = AmqpConn {
- amqpConn :: Connection
- amqpChan :: (Channel, Maybe ConsumerTag)
- type ExchangeKey = Text
- type Exchange = Text
- type QueueName = Text
- type AmqpURI = String
- withChannel :: (MonadIO m, MonadBaseControl IO m) => AmqpConf -> (AmqpConn -> m a) -> m a
- createConnectionChannel :: AmqpConf -> IO AmqpConn
- destoryConnection :: AmqpConn -> IO ()
- createQueue :: AmqpConf -> AmqpConn -> IO (QueueName, Int, Int)
- createExchange :: AmqpConf -> AmqpConn -> IO ()
- bindQueueExchange :: AmqpConf -> AmqpConn -> IO ()
- createConsumer :: AmqpConn -> QueueName -> Ack -> ((Message, Envelope) -> IO ()) -> IO AmqpConn
- deleteConsumer :: AmqpConn -> IO AmqpConn
- pauseConsumer :: AmqpConn -> IO AmqpConn
- resumeConsumer :: AmqpConn -> IO AmqpConn
Conduit
amqpReceiveSource :: MonadIO m => (Message, Envelope) -> Source m (Message, Envelope) Source
Source as consuming data pushed.
amqpSendSink :: MonadIO m => AmqpConn -> Exchange -> ExchangeKey -> Sink Message m () Source
Sink as sending data.
Data type
Amqp connection configuration. queue name, exchange name, exchange key name, and amqp URI.
AmqpConf | |
|
Amqp Connection and Channel
AmqpConn | |
|
type ExchangeKey = Text Source
Connection and Channel
:: (MonadIO m, MonadBaseControl IO m) | |
=> AmqpConf | Connection config to the AMQP server. |
-> (AmqpConn -> m a) | Action to be executed that uses the connection. |
-> m a |
Create a AMQP connection and a channel and run the given action. The connetion and channnel are properly released after the action finishes using it. Note that you should not use the given Connection, channel outside the action since it may be already been released.
createConnectionChannel Source
Create a connection and a channel. Note that it's your responsability to properly close the connection and the channels when unneeded. Use withAMQPChannels for an automatic resource control.
destoryConnection :: AmqpConn -> IO () Source
Close a connection
Exchange and Queue utils
createExchange :: AmqpConf -> AmqpConn -> IO () Source
bindQueueExchange :: AmqpConf -> AmqpConn -> IO () Source
Consumer utils
createConsumer :: AmqpConn -> QueueName -> Ack -> ((Message, Envelope) -> IO ()) -> IO AmqpConn Source
deleteConsumer :: AmqpConn -> IO AmqpConn Source
pauseConsumer :: AmqpConn -> IO AmqpConn Source
resumeConsumer :: AmqpConn -> IO AmqpConn Source