Copyright | (C) 2014-2021 Merijn Verstraaten |
---|---|
License | BSD-style (see the file LICENSE) |
Maintainer | Merijn Verstraaten <merijn@inconsistent.nl> |
Stability | experimental |
Portability | haha |
Safe Haskell | Safe |
Language | Haskell2010 |
Functions in this module are *NOT* intended to be used by regular users of
the library. Rather, they are intended for implementing parallel processing
libraries on top of broadcast-chan
, such as broadcast-chan-conduit
.
This module, while not for end users, is considered part of the public API, so users can rely on PVP bounds to avoid breakage due to changes to this module.
Synopsis
- data Action
- data BracketOnError m r = Bracket {}
- data Handler m a
- = Simple Action
- | Handle (a -> SomeException -> m Action)
- data ThreadBracket = ThreadBracket {
- setupFork :: IO ()
- cleanupFork :: IO ()
- cleanupForkError :: IO ()
- mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a
- runParallel :: forall a b m n r. (MonadIO m, MonadIO n) => Either (b -> n r) (r -> b -> n r) -> Handler IO a -> Int -> (a -> IO b) -> ((a -> m ()) -> (a -> m (Maybe b)) -> n r) -> n (BracketOnError n r)
- runParallelWith :: forall a b m n r. (MonadIO m, MonadIO n) => ThreadBracket -> Either (b -> n r) (r -> b -> n r) -> Handler IO a -> Int -> (a -> IO b) -> ((a -> m ()) -> (a -> m (Maybe b)) -> n r) -> n (BracketOnError n r)
- runParallel_ :: (MonadIO m, MonadIO n) => Handler IO a -> Int -> (a -> IO ()) -> ((a -> m ()) -> n r) -> n (BracketOnError n r)
- runParallelWith_ :: (MonadIO m, MonadIO n) => ThreadBracket -> Handler IO a -> Int -> (a -> IO ()) -> ((a -> m ()) -> n r) -> n (BracketOnError n r)
Documentation
Action to take when an exception occurs while processing an element.
data BracketOnError m r Source #
Allocation, cleanup, and work actions for parallel processing. These
should be passed to an appropriate bracketOnError
function.
Exception handler for parallel processing.
data ThreadBracket Source #
Datatype for specifying additional setup/cleanup around forking threads.
Used by runParallelWith
and runParallelWith_
to fix resource management
in broadcast-chan-conduit
.
If the allocation action can fail/abort with an exception it MUST take
care not to leak resources in these cases. In other words, IFF setupFork
succeeds then this library will ensure the corresponding cleanup runs.
Since: 0.2.1
ThreadBracket | |
|
mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a Source #
Convenience function for changing the monad the exception handler runs in.
:: forall a b m n r. (MonadIO m, MonadIO n) | |
=> Either (b -> n r) (r -> b -> n r) | Output yielder |
-> Handler IO a | Parallel processing exception handler |
-> Int | Number of threads to use |
-> (a -> IO b) | Function to run in parallel |
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r) | "Stream" processing function |
-> n (BracketOnError n r) |
Sets up parallel processing.
The workhorses of this function are the output yielder and "stream" processing functions.
The output yielder is responsible for handling the produced b
values,
which if can either yield downstream (Left
) when used with something like
conduit
or pipes
, or fold into a single results (Right
) when used to
run IO in parallel.
The stream processing function gets two arguments:
a -> m ()
- Should be used to buffer a number of elements equal to the number of threads.
a -> m b
- Which should be used to process the remainder of the
element stream via, for example,
mapM
.
See BroadcastChan or broadcast-chan-conduit
for examples.
The returned BracketOnError
has a allocate
action that takes care of
setting up forkIO
threads and exception handlers. The
cleanup
action ensures all threads are terminate in case of an exception.
Finally, action
performs the actual parallel processing of elements.
:: forall a b m n r. (MonadIO m, MonadIO n) | |
=> ThreadBracket | Bracketing action used to manage resources across thread spawns |
-> Either (b -> n r) (r -> b -> n r) | Output yielder |
-> Handler IO a | Parallel processing exception handler |
-> Int | Number of threads to use |
-> (a -> IO b) | Function to run in parallel |
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r) | "Stream" processing function |
-> n (BracketOnError n r) |
Like runParallel
, but accepts a setup and cleanup action that will be
run before spawning a new thread and upon thread exit respectively.
The main use case is to properly manage the resource reference counts of
ResourceT
.
If the setup throws an IO
exception or otherwise aborts, it MUST
ensure any allocated resource are freed. If it completes without an
exception, the cleanup is guaranteed to run (assuming proper use of
bracketing with the returned BracketOnError
).
Since: 0.2.1
:: (MonadIO m, MonadIO n) | |
=> Handler IO a | Parallel processing exception handler |
-> Int | Number of threads to use |
-> (a -> IO ()) | Function to run in parallel |
-> ((a -> m ()) -> n r) | "Stream" processing function |
-> n (BracketOnError n r) |
Sets up parallel processing for functions where we ignore the result.
The stream processing argument is the workhorse of this function. It gets a
(rate-limited) function a -> m ()
that queues a
values for processing.
This function should be applied to all a
elements that should be
processed. This would be either a partially applied forM_
for parallel processing, or something like conduit's mapM_
to
construct a "sink" for a
values. See BroadcastChan or
broadcast-chan-conduit
for examples.
The returned BracketOnError
has a allocate
action that takes care of
setting up forkIO
threads and exception handlers. Th
cleanup
action ensures all threads are terminate in case of an exception.
Finally, action
performs the actual parallel processing of elements.
:: (MonadIO m, MonadIO n) | |
=> ThreadBracket | Bracketing action used to manage resources across thread spawns |
-> Handler IO a | Parallel processing exception handler |
-> Int | Number of threads to use |
-> (a -> IO ()) | Function to run in parallel |
-> ((a -> m ()) -> n r) | "Stream" processing function |
-> n (BracketOnError n r) |
Like runParallel_
, but accepts a setup and cleanup action that will be
run before spawning a new thread and upon thread exit respectively.
The main use case is to properly manage the resource reference counts of
ResourceT
.
If the setup throws an IO
exception or otherwise aborts, it MUST
ensure any allocated resource are freed. If it completes without an
exception, the cleanup is guaranteed to run (assuming proper use of
bracketing with the returned BracketOnError
).
Since: 0.2.1