Copyright | (c) Matthew Peddie 2014 |
---|---|
License | BSD3 (see the file zeromq4-pipes/LICENSE) |
Maintainer | matt.peddie@planet.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
This module provides functions to help you attach ZMQ sockets to Pipes processing pipelines.
If you want to hook ZMQ sockets into a unidirectional pipeline
involving Producer
s, Consumer
s and Pipe
s, see
examples/proxy.hs
for a short usage example for setupProducer
and
setupConsumer
.
If you want to hook ZMQ sockets into a bidirectional or
non-Pull
-based pipeline involving Client
s, Server
s and Proxy
s,
see examples/server.hs
and examples/client.hs
for a short usage
example for setupServer
and setupClient
.
This module relies on the functions provided by Pipes.Safe to deal with exceptions and termination. If you need to avoid this layer of safety, please don't hesitate to contact the author for support.
- setupProducer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Producer [ByteString] m ()
- setupConsumer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Consumer' (NonEmpty ByteString) m ()
- setupBi :: (MonadSafe m, Base m ~ IO, MonadSafe m1, Base m1 ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> m1 (Producer [ByteString] m (), Consumer (NonEmpty ByteString) m ())
- setupClient :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Client [ByteString] (NonEmpty ByteString) m ()
- setupServer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> NonEmpty ByteString -> Server (NonEmpty ByteString) [ByteString] m ()
- setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v
- receiveLoop :: (Receiver sck, MonadIO m) => Socket sck -> Producer [ByteString] m ()
- sendLoop :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m ()
- toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m ()
- module Pipes
- module Pipes.Safe
Create data sources and sinks
Each of these functions takes a setup function, which is is run
before any messaging activities happen, so anything you need to
do to configure the socket (e.g. subscribe
, set options,
connect
or bind
) should be done within it.
Note that according to the ZeroMQ manual pages, the correct
order of operations is to perform all socket configuration before
running connect
or bind
.
:: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function |
-> Producer [ByteString] m () | Message source |
:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function |
-> Consumer' (NonEmpty ByteString) m () | Message sink |
Create a Consumer
of message data from the given ZeroMQ
parameters. All data successfully await
ed from upstream will be
sent out the socket.
The resulting Consumer
will only accept NonEmpty
lists of
ByteString
message parts. See toNonEmpty
if this is a
sticking point for you.
:: (MonadSafe m, Base m ~ IO, MonadSafe m1, Base m1 ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function |
-> m1 (Producer [ByteString] m (), Consumer (NonEmpty ByteString) m ()) | Message (source, sink) pair |
Create both a Producer
and a Consumer
of message data, both
corresponding to the same socket, from the given ZeroMQ parameters.
This is like setupProducer
and setupConsumer
combined; the
socket type must be both a Sender
and a Receiver
(for
example, a Dealer
). Messages received over the socket are
yield
ed by the Producer
; messages await
ed by the Consumer
are sent over the socket.
See also the descriptions of setupProducer
and setupConsumer
.
:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function |
-> Client [ByteString] (NonEmpty ByteString) m () |
:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function |
-> NonEmpty ByteString | Server request input |
-> Server (NonEmpty ByteString) [ByteString] m () |
Low-level safe setup
It's recommended to use setupProducer
, setupConsumer
and
friends instead of these functions unless you know what you're
doing and need something else.
setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v Source
This is the low-level function for safely bracket
ing ZMQ socket
creation, so that any exceptions or Pipe
termination will not
result in abandoned sockets. For example, setupProducer
is
defined as
setupProducer ctx ty opts = setup ctx ty opts receiveLoop
receiveLoop :: (Receiver sck, MonadIO m) => Socket sck -> Producer [ByteString] m () Source
This is a low-level function for simply passing all messages received on a socket downstream.
receiveLoop sock = forever $ liftIO (Z.receiveMulti sock) >>= yield
sendLoop :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m () Source
This is a low-level function for simply sending all messages available from upstream out on the socket.
sendLoop sock = forever $ await >>= liftIO . Z.sendMulti sock
Helpers
toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m () Source
Re-exported modules
module Pipes
module Pipes.Safe