Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- newChan :: UnagiPrim a => IO (InChan a, OutChan a)
- data InChan a
- data OutChan a
- class (Prim a, Eq a) => UnagiPrim a where
- atomicUnicorn :: Maybe a
- tryReadChan :: UnagiPrim a => OutChan a -> IO (Element a)
- readChan :: UnagiPrim a => IO () -> OutChan a -> IO a
- newtype Element a = Element {}
- isActive :: OutChan a -> IO Bool
- writeChan :: UnagiPrim a => InChan a -> a -> IO ()
- writeList2Chan :: UnagiPrim a => InChan a -> [a] -> IO ()
- dupChan :: InChan a -> IO (OutChan a)
- newtype Stream a = Stream {
- tryReadNext :: IO (Next a)
- data Next a
- streamChan :: UnagiPrim a => Int -> OutChan a -> IO [Stream a]
Documentation
General-purpose concurrent FIFO queue without blocking reads, and with optimized variants for single-threaded producers and/or consumers. This variant, and even more so the SP/SC variants, offer the lowest latency of all of the implementations in this library.
Creating channels
newChan :: UnagiPrim a => IO (InChan a, OutChan a) Source #
Create a new channel, returning its write and read ends.
The write end of a channel created with newChan
.
The read end of a channel created with newChan
.
class (Prim a, Eq a) => UnagiPrim a where Source #
Our class of types supporting primitive array operations. Instance method definitions are architecture-dependent.
Nothing
atomicUnicorn :: Maybe a Source #
When the read and write operations of the underlying Prim
instances
on aligned memory are atomic, this may be set to Just x
where x
is
some rare (i.e. unlikely to occur frequently in your data) magic value;
this might help speed up some UnagiPrim
operations.
Where those Prim
instance operations are not atomic, this *must* be
set to Nothing
.
Instances
Channel operations
Reading
tryReadChan :: UnagiPrim a => OutChan a -> IO (Element a) Source #
Returns immediately with an
future, which returns one
unique element when it becomes available via Element
atryRead
.
Note re. exceptions: When an async exception is raised during a tryReadChan
the message that the read would have returned is likely to be lost, just as
it would be when raised directly after this function returns.
readChan :: UnagiPrim a => IO () -> OutChan a -> IO a Source #
readChan io c
returns the next element from c
, calling tryReadChan
and looping on the Element
returned, and calling io
at each iteration
when the element is not yet available. It throws BlockedIndefinitelyOnMVar
when isActive
determines that a value will never be returned.
When used like readChan
or yield
readChan (
this is
the semantic equivalent to the blocking threadDelay
10)readChan
in the other
implementations.
An IO
action that returns a particular enqueued element when and if it
becomes available.
Each Element
corresponds to a particular enqueued element, i.e. a returned
Element
always offers the only means to access one particular enqueued
item. The value returned by tryRead
moves monotonically from Nothing
to Just a
when and if an element becomes available, and is idempotent at
that point.
Utilities
isActive :: OutChan a -> IO Bool Source #
An action that returns False
sometime after the chan no longer has any
writers.
After False
is returned, any tryRead
which returns Nothing
can
be considered to be dead. Likewise for tryReadNext
. Note that in the
blocking implementations a BlockedIndefinitelyOnMVar
exception is raised,
so this function is unnecessary.
Writing
writeList2Chan :: UnagiPrim a => InChan a -> [a] -> IO () Source #
Write an entire list of items to a chan type. Writes here from multiple threads may be interleaved, and infinite lists are supported.
Broadcasting
dupChan :: InChan a -> IO (OutChan a) Source #
Duplicate a chan: the returned OutChan
begins empty, but data written to
the argument InChan
from then on will be available from both the original
OutChan
and the one returned here, creating a kind of broadcast channel.
See also streamChan
for a faster alternative that might be appropriate.
Streaming
An infinite stream of elements. tryReadNext
can be called any number of
times from multiple threads, and returns a value which moves monotonically
from Pending
to Next
if and when a head element becomes available.
isActive
can be used to determine if the stream has expired.
Stream | |
|
Next a (Stream a) | The next head element along with the tail |
Pending | The next element is not yet in the queue; you can retry |
streamChan :: UnagiPrim a => Int -> OutChan a -> IO [Stream a] Source #
Produce the specified number of interleaved "streams" from a chan.
Nextuming a Stream
is much faster than calling tryReadChan
, and
might be useful when an MPSC queue is needed, or when multiple consumers
should be load-balanced in a round-robin fashion.
Usage example:
do mapM_ (writeChan
i) [1..9] [str1, str2, str2] <-streamChan
3 o forkIO $ printStream str1 -- prints: 1,4,7 forkIO $ printStream str2 -- prints: 2,5,8 forkIO $ printStream str3 -- prints: 3,6,9 where printStream str = do h <-tryReadNext
str case h ofNext
a str' -> print a >> printStream str' -- We know that all values were already written, so a Pending tells -- us we can exit; in other cases we might callyield
and then -- retry that same:
tryReadNext
strPending
-> return ()
Be aware: if one stream consumer falls behind another (e.g. because it is
slower) the number of elements in the queue which can't be GC'd will grow.
You may want to do some coordination of Stream
consumers to prevent
this.