pipes-zeromq4-0.1.0.0: Pipes integration for ZeroMQ messaging

Copyright(c) Matthew Peddie 2014
LicenseBSD3 (see the file zeromq4-pipes/LICENSE)
Maintainermatt.peddie@planet.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Pipes.ZMQ4

Contents

Description

This module provides functions to help you attach ZMQ sockets to Pipes processing pipelines.

If you want to hook ZMQ sockets into a unidirectional pipeline involving Producers, Consumers and Pipes, see examples/proxy.hs for a short usage example for setupProducer and setupConsumer.

If you want to hook ZMQ sockets into a bidirectional or non-Pull-based pipeline involving Clients, Servers and Proxys, see examples/server.hs and examples/client.hs for a short usage example for setupServer and setupClient.

This module relies on the functions provided by Pipes.Safe to deal with exceptions and termination. If you need to avoid this layer of safety, please don't hesitate to contact the author for support.

Synopsis

Create data sources and sinks

Each of these functions takes a setup function, which is is run before any messaging activities happen, so anything you need to do to configure the socket (e.g. subscribe, set options, connect or bind) should be done within it.

Note that according to the ZeroMQ manual pages, the correct order of operations is to perform all socket configuration before running connect or bind.

setupProducer Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> Producer [ByteString] m ()

Message source

Create a Producer of message data from the given ZeroMQ parameters. All messages received on the socket will be sent downstream with yield.

setupConsumer Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> Consumer' (NonEmpty ByteString) m ()

Message sink

Create a Consumer of message data from the given ZeroMQ parameters. All data successfully awaited from upstream will be sent out the socket.

The resulting Consumer will only accept NonEmpty lists of ByteString message parts. See toNonEmpty if this is a sticking point for you.

setupBi Source

Arguments

:: (MonadSafe m, Base m ~ IO, MonadSafe m1, Base m1 ~ IO, SocketType sockty, Sender sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> m1 (Producer [ByteString] m (), Consumer (NonEmpty ByteString) m ())

Message (source, sink) pair

Create both a Producer and a Consumer of message data, both corresponding to the same socket, from the given ZeroMQ parameters. This is like setupProducer and setupConsumer combined; the socket type must be both a Sender and a Receiver (for example, a Dealer). Messages received over the socket are yielded by the Producer; messages awaited by the Consumer are sent over the socket.

See also the descriptions of setupProducer and setupConsumer.

setupClient Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> Client [ByteString] (NonEmpty ByteString) m () 

Create a Client' from the given ZeroMQ parameters. The Client' passes all messages it receives on the ZMQ socket upstream with request and sends all corresponding replies back out the socket.

setupServer Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> NonEmpty ByteString

Server request input

-> Server (NonEmpty ByteString) [ByteString] m () 

Create a Server' from the given ZeroMQ parameters. The Server' sends all received requests out on the ZMQ socket and passes all messages it receives over the ZMQ socket back downstream with respond.

Low-level safe setup

It's recommended to use setupProducer, setupConsumer and friends instead of these functions unless you know what you're doing and need something else.

setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v Source

This is the low-level function for safely bracketing ZMQ socket creation, so that any exceptions or Pipe termination will not result in abandoned sockets. For example, setupProducer is defined as

setupProducer ctx ty opts = setup ctx ty opts receiveLoop

receiveLoop :: (Receiver sck, MonadIO m) => Socket sck -> Producer [ByteString] m () Source

This is a low-level function for simply passing all messages received on a socket downstream.

receiveLoop sock = forever $ liftIO (Z.receiveMulti sock) >>= yield

sendLoop :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m () Source

This is a low-level function for simply sending all messages available from upstream out on the socket.

sendLoop sock = forever $ await >>= liftIO . Z.sendMulti sock

Helpers

toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m () Source

This is simply an adapter between lists of message parts and the NonEmpty lists demanded by ZMQ Consumers. If an empty list arrives in the input, it will be ignored.

Re-exported modules

module Pipes

module Pipes.Safe