RabbitMQ-0.1.0.0: AMQP 0-9-1 client library for RabbitMQ servers

Safe HaskellNone
LanguageHaskell2010

Network.AMQP

Contents

Description

An AMQP 0-9-1 client library for RabbitMQ servers.

A good introduction to RabbitMQ and AMQP 0-9-1 (in various languages): http://www.rabbitmq.com/getstarted.html, http://www.rabbitmq.com/tutorials/amqp-concepts.html

Example

Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange

{-# LANGUAGE OverloadedStrings #-}
import Network.AMQP
import qualified Data.ByteString.Lazy.Char8 as BL

main = do
    conn <- openConnection "127.0.0.1" "/" "guest" "guest"
    chan <- openChannel conn

    -- declare a queue, exchange and binding
    declareQueue chan newQueue {queueName = "myQueue"}
    declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"}
    bindQueue chan "myQueue" "myExchange" "myKey"

    -- subscribe to the queue
    consumeMsgs chan "myQueue" Ack myCallback

    -- publish a message to our new exchange
    publishMsg chan "myExchange" "myKey"
        newMsg {msgBody = (BL.pack "hello world"),
                msgDeliveryMode = Just Persistent}

    getLine -- wait for keypress
    closeConnection conn
    putStrLn "connection closed"


myCallback :: (Message,Envelope) -> IO ()
myCallback (msg, env) = do
    putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg)
    -- acknowledge receiving the message
    ackEnv env

Exception handling notes

Some function calls can make the AMQP server throw an AMQP exception, which has the side-effect of closing the connection or channel. The AMQP exceptions are raised as Haskell exceptions (see AMQPException). So upon receiving an AMQPException you may have to reopen the channel or connection.

Debugging tips

If you need to debug a problem with e.g. channels being closed unexpectedly, here are some tips:

Synopsis

Connection

data ConnectionOpts Source #

Represents the parameters to connect to a broker or a cluster of brokers. See defaultConnectionOpts.

Constructors

ConnectionOpts 

Fields

defaultConnectionOpts :: ConnectionOpts Source #

Constructs default connection options with the following settings :

  • Broker: amqp://guest:guest@localhost:5672/%2F using the PLAIN SASL mechanism
  • max frame size: 131072
  • use the heartbeat delay suggested by the broker
  • no limit on the number of used channels

openConnection :: String -> Text -> Text -> Text -> IO Connection Source #

openConnection hostname virtualHost loginName loginPassword opens a connection to an AMQP server running on hostname. virtualHost is used as a namespace for AMQP resources (default is "/"), so different applications could use multiple virtual hosts on the same AMQP server.

You must call closeConnection before your program exits to ensure that all published messages are received by the server.

The loginName and loginPassword will be used to authenticate via the PLAIN SASL mechanism.

NOTE: If the login name, password or virtual host are invalid, this method will throw a ConnectionClosedException. The exception will not contain a reason why the connection was closed, so you'll have to find out yourself.

openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection Source #

same as openConnection but allows you to specify a non-default port-number as the 2nd parameter

openConnection'' :: ConnectionOpts -> IO Connection Source #

Opens a connection to a broker specified by the given ConnectionOpts parameter.

closeChannel :: Channel -> IO () Source #

closes a channel. It is typically not necessary to manually call this as closing a connection will implicitly close all channels.

closeConnection :: Connection -> IO () Source #

closes a connection

Make sure to call this function before your program exits to ensure that all published messages are received by the server.

addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO () Source #

addConnectionClosedHandler conn ifClosed handler adds a handler that will be called after the connection is closed (either by calling closeConnection or by an exception). If the ifClosed parameter is True and the connection is already closed, the handler will be called immediately. If ifClosed == False and the connection is already closed, the handler will never be called

addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO () Source #

addConnectionBlockedHandler conn blockedHandler unblockedHandler adds handlers that will be called when a connection gets blocked/unlocked due to server resource constraints.

More information: https://www.rabbitmq.com/connection-blocked.html

getServerProperties :: Connection -> IO FieldTable Source #

get the server properties sent in connection.start

Channel

data Channel Source #

A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe)

openChannel :: Connection -> IO Channel Source #

opens a new channel on the connection

By default, if a channel is closed by an AMQP exception, this exception will be printed to stderr. You can prevent this behaviour by setting a custom exception handler (using addChannelExceptionHandler).

addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO () Source #

registers a callback function that is called whenever a message is returned from the broker ('basic.return').

addChannelExceptionHandler :: Channel -> (SomeException -> IO ()) -> IO () Source #

registers a callback function that is called whenever a channel is closed by an exception.

qos :: Channel -> Word32 -> Word16 -> Bool -> IO () Source #

qos chan prefetchSize prefetchCount global limits the amount of data the server delivers before requiring acknowledgements. prefetchSize specifies the number of bytes and prefetchCount the number of messages. In both cases the value 0 means unlimited.

The meaning of the global flag is explained here: http://www.rabbitmq.com/consumer-prefetch.html

NOTE: RabbitMQ does not implement prefetchSize and will throw an exception if it doesn't equal 0.

Exchanges

data ExchangeOpts Source #

A record that contains the fields needed when creating a new exhange using declareExchange. The default values apply when you use newExchange.

Constructors

ExchangeOpts 

Fields

  • exchangeName :: Text

    (must be set); the name of the exchange

  • exchangeType :: Text

    (must be set); the type of the exchange ("fanout", "direct", "topic", "headers")

  • exchangePassive :: Bool

    (default False); If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.

  • exchangeDurable :: Bool

    (default True); If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

  • exchangeAutoDelete :: Bool

    (default False); If set, the exchange is deleted when all queues have finished using it.

  • exchangeInternal :: Bool

    (default False); If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

  • exchangeArguments :: FieldTable

    (default empty); A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation.

newExchange :: ExchangeOpts Source #

an ExchangeOpts with defaults set; you must override at least the exchangeName and exchangeType fields.

declareExchange :: Channel -> ExchangeOpts -> IO () Source #

declares a new exchange on the AMQP server. Can be used like this: declareExchange channel newExchange {exchangeName = "myExchange", exchangeType = "fanout"}

bindExchange :: Channel -> Text -> Text -> Text -> IO () Source #

bindExchange chan destinationName sourceName routingKey binds the exchange to the exchange using the provided routing key

bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #

an extended version of bindExchange that allows you to include arbitrary arguments. This is useful to use the headers exchange-type.

unbindExchange :: Channel -> Text -> Text -> Text -> IO () Source #

unbindExchange chan destinationName sourceName routingKey unbinds an exchange from an exchange. The routingKey must be identical to the one specified when binding the exchange.

unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #

an extended version of unbindExchange that allows you to include arguments. The arguments must be identical to the ones specified when binding the exchange.

deleteExchange :: Channel -> Text -> IO () Source #

deletes the exchange with the provided name

Queues

data QueueOpts Source #

A record that contains the fields needed when creating a new queue using declareQueue. The default values apply when you use newQueue.

Constructors

QueueOpts 

Fields

  • queueName :: Text

    (default ""); the name of the queue; if left empty, the server will generate a new name and return it from the declareQueue method

  • queuePassive :: Bool

    (default False); If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.

  • queueDurable :: Bool

    (default True); If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

  • queueExclusive :: Bool

    (default False); Exclusive queues may only be consumed from by the current connection. Setting the exclusive flag always implies 'auto-delete'.

  • queueAutoDelete :: Bool

    (default False); If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.

  • queueHeaders :: FieldTable

    (default empty): Headers to use when creating this queue, such as x-message-ttl or x-dead-letter-exchange.

newQueue :: QueueOpts Source #

a QueueOpts with defaults set; you should override at least queueName.

declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int) Source #

creates a new queue on the AMQP server; can be used like this: declareQueue channel newQueue {queueName = "myQueue"}.

Returns a tuple (queue, messageCount, consumerCount). queue is the name of the new queue (if you don't specify a queue the server will autogenerate one). messageCount is the number of messages in the queue, which will be zero for newly-created queues. consumerCount is the number of active consumers for the queue.

bindQueue :: Channel -> Text -> Text -> Text -> IO () Source #

bindQueue chan queue exchange routingKey binds the queue to the exchange using the provided routing key. If exchange is the empty string, the default exchange will be used.

bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #

an extended version of bindQueue that allows you to include arbitrary arguments. This is useful to use the headers exchange-type.

unbindQueue :: Channel -> Text -> Text -> Text -> IO () Source #

unbindQueue chan queue exchange routingKey unbinds a queue from an exchange. The routingKey must be identical to the one specified when binding the queue.

unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO () Source #

an extended version of unbindQueue that allows you to include arguments. The arguments must be identical to the ones specified when binding the queue.

purgeQueue :: Channel -> Text -> IO Word32 Source #

remove all messages from the queue; returns the number of messages that were in the queue

deleteQueue :: Channel -> Text -> IO Word32 Source #

deletes the queue; returns the number of messages that were in the queue before deletion

Messaging

data Message Source #

An AMQP message

Constructors

Message 

Fields

Instances
Eq Message Source # 
Instance details

Defined in Network.AMQP.Internal

Methods

(==) :: Message -> Message -> Bool #

(/=) :: Message -> Message -> Bool #

Ord Message Source # 
Instance details

Defined in Network.AMQP.Internal

Read Message Source # 
Instance details

Defined in Network.AMQP.Internal

Show Message Source # 
Instance details

Defined in Network.AMQP.Internal

newMsg :: Message Source #

a Msg with defaults set; you should override at least msgBody

data Envelope Source #

contains meta-information of a delivered message (through getMsg or consumeMsgs)

data Ack Source #

specifies whether you have to acknowledge messages that you receive from consumeMsgs or getMsg. If you use Ack, you have to call ackMsg or ackEnv after you have processed a message, otherwise it might be delivered again in the future

Constructors

Ack 
NoAck 
Instances
Eq Ack Source # 
Instance details

Defined in Network.AMQP

Methods

(==) :: Ack -> Ack -> Bool #

(/=) :: Ack -> Ack -> Bool #

Ord Ack Source # 
Instance details

Defined in Network.AMQP

Methods

compare :: Ack -> Ack -> Ordering #

(<) :: Ack -> Ack -> Bool #

(<=) :: Ack -> Ack -> Bool #

(>) :: Ack -> Ack -> Bool #

(>=) :: Ack -> Ack -> Bool #

max :: Ack -> Ack -> Ack #

min :: Ack -> Ack -> Ack #

Read Ack Source # 
Instance details

Defined in Network.AMQP

Show Ack Source # 
Instance details

Defined in Network.AMQP

Methods

showsPrec :: Int -> Ack -> ShowS #

show :: Ack -> String #

showList :: [Ack] -> ShowS #

consumeMsgs :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO ConsumerTag Source #

consumeMsgs chan queue ack callback subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If ack == Ack you will have to acknowledge all incoming messages (see ackMsg and ackEnv)

If you do any exception handling inside the callback, you should make sure not to catch ChanThreadKilledException, or re-throw it if you did catch it, since it is used internally by the library to close channels.

NOTE: The callback will be run on the channel's receiver thread (which is responsible for handling all incoming messages on this channel, including responses to requests from the client) so DO NOT perform any blocking request on chan inside the callback, as this would lead to a dead-lock. However, you CAN perform requests on other open channels inside the callback, though that would keep chan blocked until the requests are done, so it is not recommended. Unless you're using AMQP flow control, the following functions can safely be called on chan: ackMsg, ackEnv, rejectMsg, publishMsg. If you use flow-control or want to perform anything more complex, it's a good idea to wrap your requests inside forkIO.

consumeMsgs' :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> (ConsumerTag -> IO ()) -> FieldTable -> IO ConsumerTag Source #

an extended version of consumeMsgs that allows you to define a consumer cancellation callback and include arbitrary arguments.

cancelConsumer :: Channel -> ConsumerTag -> IO () Source #

stops a consumer that was started with consumeMsgs

publishMsg :: Channel -> Text -> Text -> Message -> IO (Maybe Int) Source #

publishMsg chan exchange routingKey msg publishes msg to the exchange with the provided exchange. The effect of routingKey depends on the type of the exchange.

Returns the sequence-number of the message (only if the channel is in publisher confirm mode; see confirmSelect).

NOTE: This method may temporarily block if the AMQP server requested us to stop sending content data (using the flow control mechanism). So don't rely on this method returning immediately.

publishMsg' :: Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int) Source #

Like publishMsg, but additionally allows you to specify whether the mandatory flag should be set.

getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope)) Source #

getMsg chan ack queue gets a message from the specified queue. If ack==Ack, you have to call ackMsg or ackEnv for any message that you get, otherwise it might be delivered again in the future (by calling recoverMsgs)

rejectMsg :: Channel -> LongLongInt -> Bool -> IO () Source #

rejectMsg chan deliveryTag requeue allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. If requeue==False, the message will be discarded. If it is True, the server will attempt to requeue the message.

NOTE: RabbitMQ 1.7 doesn't implement this command

rejectEnv Source #

Arguments

:: Envelope 
-> Bool

requeue

-> IO () 

Reject a message. This is a wrapper for rejectMsg in case you have the Envelope at hand.

recoverMsgs :: Channel -> Bool -> IO () Source #

recoverMsgs chan requeue asks the broker to redeliver all messages that were received but not acknowledged on the specified channel. If requeue==False, the message will be redelivered to the original recipient. If requeue==True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

ackMsg :: Channel -> LongLongInt -> Bool -> IO () Source #

ackMsg chan deliveryTag multiple acknowledges one or more messages. A message MUST not be acknowledged more than once.

if multiple==True, the deliverTag is treated as "up to and including", so that the client can acknowledge multiple messages with a single method call. If multiple==False, deliveryTag refers to a single message.

If multiple==True, and deliveryTag==0, tells the server to acknowledge all outstanding mesages.

ackEnv :: Envelope -> IO () Source #

Acknowledges a single message. This is a wrapper for ackMsg in case you have the Envelope at hand.

Transactions

txSelect :: Channel -> IO () Source #

This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.

txCommit :: Channel -> IO () Source #

This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.

txRollback :: Channel -> IO () Source #

This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.

Confirms

confirmSelect :: Channel -> Bool -> IO () Source #

confirmSelect chan nowait puts the channel in publisher confirm mode. This mode is a RabbitMQ extension where a producer receives confirmations when messages are successfully processed by the broker. Publisher confirms are a relatively lightweight alternative to full transactional mode. For details about the delivery guarantees and performace implications of this mode, see https://www.rabbitmq.com/confirms.html. Note that on a single channel, publisher confirms and transactions are mutually exclusive (you cannot select both at the same time). When nowait==True the server will not send back a response to this method.

waitForConfirms :: Channel -> IO ConfirmationResult Source #

Calling this function will cause the invoking thread to block until all previously published messages have been acknowledged by the broker (positively or negatively). Returns a value of type ConfirmationResult, holding a tuple of two IntSets (acked, nacked), ontaining the delivery tags for the messages that have been confirmed by the broker.

waitForConfirmsUntil :: Channel -> Int -> IO ConfirmationResult Source #

Same as waitForConfirms, but with a timeout in microseconds. Note that, since this operation may timeout before the server has acked or nacked all pending messages, the returned ConfirmationResult should be pattern-matched for the constructors Complete (acked, nacked) and Partial (acked, nacked, pending)

addConfirmationListener :: Channel -> ((Word64, Bool, AckType) -> IO ()) -> IO () Source #

Adds a handler which will be invoked each time the Channel receives a confirmation from the broker. The parameters passed to the the handler are the deliveryTag for the message being confirmed, a flag indicating whether the confirmation refers to this message individually (False) or all messages up to this one (True) and an AckType whose value can be either BasicAck or BasicNack.

data AckType Source #

Constructors

BasicAck 
BasicNack 
Instances
Show AckType Source # 
Instance details

Defined in Network.AMQP.Internal

Flow Control

flow :: Channel -> Bool -> IO () Source #

flow chan active tells the AMQP server to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process.

If active==True the server will start sending content data, if active==False the server will stop sending content data.

A new channel is always active by default.

NOTE: RabbitMQ 1.7 doesn't implement this command.

SASL

data SASLMechanism Source #

A SASLMechanism is described by its name (saslName), its initial response (saslInitialResponse), and an optional function (saslChallengeFunc) that transforms a security challenge provided by the server into response, which is then sent back to the server for verification.

Constructors

SASLMechanism 

Fields

plain :: Text -> Text -> SASLMechanism Source #

The PLAIN SASL mechanism. See RFC4616

rabbitCRdemo :: Text -> Text -> SASLMechanism Source #

The RABBIT-CR-DEMO SASL mechanism needs to be explicitly enabled on the RabbitMQ server and should only be used for demonstration purposes of the challenge-response cycle. See http://www.rabbitmq.com/authentication.html.

Exceptions

data ChanThreadKilledException Source #

Thrown in the channel thread when the connection gets closed. When handling exceptions in a subscription callback, make sure to re-throw this so the channel thread can be stopped.

data CloseType Source #

describes whether a channel was closed by user-request (Normal) or by an AMQP exception (Abnormal)

Constructors

Normal 
Abnormal 

URI parsing

fromURI :: String -> ConnectionOpts Source #

Parses amqp standard URI of the form amqp://user:password@host:port/vhost and returns a ConnectionOpts for use with openConnection''

Any of these fields may be empty and will be replaced with defaults from amqp://guest:guest@localhost:5672/