Copyright | (C) 2014-2018 Merijn Verstraaten |
---|---|
License | BSD-style (see the file LICENSE) |
Maintainer | Merijn Verstraaten <merijn@inconsistent.nl> |
Stability | experimental |
Portability | haha |
Safe Haskell | Safe |
Language | Haskell2010 |
Functions in this module are *NOT* intended to be used by regular users of
the library. Rather, they are intended for implementing parallel processing
libraries on top of broadcast-chan
, such as broadcast-chan-conduit
.
This module, while not for end users, is considered part of the public API, so users can rely on PVP bounds to avoid breakage due to changes to this module.
Synopsis
- data Action
- data BracketOnError m r = Bracket {}
- data Handler m a
- = Simple Action
- | Handle (a -> SomeException -> m Action)
- mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a
- runParallel :: forall a b m n r. (MonadIO m, MonadIO n) => Either (b -> n r) (r -> b -> n r) -> Handler IO a -> Int -> (a -> IO b) -> ((a -> m ()) -> (a -> m (Maybe b)) -> n r) -> n (BracketOnError n r)
- runParallel_ :: (MonadIO m, MonadIO n) => Handler IO a -> Int -> (a -> IO ()) -> ((a -> m ()) -> n r) -> n (BracketOnError n r)
Documentation
Action to take when an exception occurs while processing an element.
data BracketOnError m r Source #
Allocation, cleanup, and work actions for parallel processing. These
should be passed to an appropriate bracketOnError
function.
Exception handler for parallel processing.
mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a Source #
Convenience function for changing the monad the exception handler runs in.
:: (MonadIO m, MonadIO n) | |
=> Either (b -> n r) (r -> b -> n r) | Output yielder |
-> Handler IO a | Parallel processing exception handler |
-> Int | Number of threads to use |
-> (a -> IO b) | Function to run in parallel |
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r) | "Stream" processing function |
-> n (BracketOnError n r) |
Sets up parallel processing.
The workhorses of this function are the output yielder and "stream" processing functions.
The output yielder is responsible for handling the produced b
values,
which if can either yield downstream (Left
) when used with something like
conduit
or pipes
, or fold into a single results (Right
) when used to
run IO in parallel.
The stream processing function gets two arguments:
a -> m ()
- Should be used to buffer a number of elements equal to the number of threads.
a -> m b
- Which should be used to process the remainder of the
element stream via, for example,
mapM
.
See BroadcastChan or broadcast-chan-conduit
for examples.
The returned BracketOnError
has a allocate
action that takes care of
setting up forkIO
threads and exception handlers. The
cleanup
action ensures all threads are terminate in case of an exception.
Finally, action
performs the actual parallel processing of elements.
:: (MonadIO m, MonadIO n) | |
=> Handler IO a | Parallel processing exception handler |
-> Int | Number of threads to use |
-> (a -> IO ()) | Function to run in parallel |
-> ((a -> m ()) -> n r) | "Stream" processing function |
-> n (BracketOnError n r) |
Sets up parallel processing for functions where we ignore the result.
The stream processing argument is the workhorse of this function. It gets a
(rate-limited) function a -> m ()
that queues a
values for processing.
This function should be applied to all a
elements that should be
processed. This would be either a partially applied forM_
for parallel processing, or something like conduit's mapM_
to
construct a "sink" for a
values. See BroadcastChan or
broadcast-chan-conduit
for examples.
The returned BracketOnError
has a allocate
action that takes care of
setting up forkIO
threads and exception handlers. Th
cleanup
action ensures all threads are terminate in case of an exception.
Finally, action
performs the actual parallel processing of elements.