Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell98 |
Parallelism combinators with explicit thread-pool creation and passing.
The most basic example of usage is:
main = withPool 2 $ \pool -> parallel_ pool [putStrLn "Echo", putStrLn " in parallel"]
Make sure that you compile with -threaded
and supply +RTS -N2 -RTS
to the generated Haskell executable, or you won't get any parallelism.
If you plan to allow your worker items to block, then you should read the documentation for extraWorkerWhileBlocked
.
The Control.Concurrent.ParallelIO.Global module is implemented on top of this one by maintaining a shared global thread pool with one thread per capability.
Synopsis
- parallel_ :: Pool -> [IO a] -> IO ()
- parallelE_ :: Pool -> [IO a] -> IO [Maybe SomeException]
- parallel :: Pool -> [IO a] -> IO [a]
- parallelE :: Pool -> [IO a] -> IO [Either SomeException a]
- parallelInterleaved :: Pool -> [IO a] -> IO [a]
- parallelInterleavedE :: Pool -> [IO a] -> IO [Either SomeException a]
- parallelFirst :: Pool -> [IO (Maybe a)] -> IO (Maybe a)
- parallelFirstE :: Pool -> [IO (Maybe a)] -> IO (Maybe (Either SomeException a))
- data Pool
- withPool :: Int -> (Pool -> IO a) -> IO a
- startPool :: Int -> IO Pool
- stopPool :: Pool -> IO ()
- extraWorkerWhileBlocked :: Pool -> IO a -> IO a
- spawnPoolWorkerFor :: Pool -> IO ()
- killPoolWorkerFor :: Pool -> IO ()
Executing actions
parallel_ :: Pool -> [IO a] -> IO () Source #
Run the list of computations in parallel.
Has the following properties:
- Never creates more or less unblocked threads than are specified to
live in the pool. NB: this count includes the thread executing
parallel_
. This should minimize contention and hence pre-emption, while also preventing starvation. - On return all actions have been performed.
- The function returns in a timely manner as soon as all actions have been performed.
- The above properties are true even if
parallel_
is used by an action which is itself being executed by one of the parallel combinators. - If any of the IO actions throws an exception this does not prevent any of the other actions from being performed.
- If any of the IO actions throws an exception, the exception thrown by the first
failing action in the input list will be thrown by
parallel_
. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have completed.
The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
The reason for this behaviour can be seen by considering this machine state:
- The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
- Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 2 will eventually put into the handle.
Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!
By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.
parallelE_ :: Pool -> [IO a] -> IO [Maybe SomeException] Source #
As parallel_
, but instead of throwing exceptions that are thrown by subcomputations,
they are returned in a data structure.
As a result, property 6 of parallel_
is not preserved, and therefore if your IO actions can depend on each other
and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
parallel :: Pool -> [IO a] -> IO [a] Source #
Run the list of computations in parallel, returning the results in the same order as the corresponding actions.
Has the following properties:
- Never creates more or less unblocked threads than are specified to
live in the pool. NB: this count includes the thread executing
parallel
. This should minimize contention and hence pre-emption, while also preventing starvation. - On return all actions have been performed.
- The function returns in a timely manner as soon as all actions have been performed.
- The above properties are true even if
parallel
is used by an action which is itself being executed by one of the parallel combinators. - If any of the IO actions throws an exception this does not prevent any of the other actions from being performed.
- If any of the IO actions throws an exception, the exception thrown by the first
failing action in the input list will be thrown by
parallel
. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have completed.
The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
The reason for this behaviour can be seen by considering this machine state:
- The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
- Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 2 will eventually put into the handle.
Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!
By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.
parallelE :: Pool -> [IO a] -> IO [Either SomeException a] Source #
As parallel
, but instead of throwing exceptions that are thrown by subcomputations,
they are returned in a data structure.
As a result, property 6 of parallel
is not preserved, and therefore if your IO actions can depend on each other
and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
parallelInterleaved :: Pool -> [IO a] -> IO [a] Source #
Run the list of computations in parallel, returning the results in the approximate order of completion.
Has the following properties:
- Never creates more or less unblocked threads than are specified to
live in the pool. NB: this count includes the thread executing
parallelInterleaved
. This should minimize contention and hence pre-emption, while also preventing starvation. - On return all actions have been performed.
- The result of running actions appear in the list in undefined order, but which is likely to be very similar to the order of completion.
- The above properties are true even if
parallelInterleaved
is used by an action which is itself being executed by one of the parallel combinators. - If any of the IO actions throws an exception this does not prevent any of the other actions from being performed.
- If any of the IO actions throws an exception, the exception thrown by the first
failing action in the input list will be thrown by
parallelInterleaved
. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have completed.
The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
The reason for this behaviour can be seen by considering this machine state:
- The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
- Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 1 will eventually put into the handle.
Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!
By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.
parallelInterleavedE :: Pool -> [IO a] -> IO [Either SomeException a] Source #
As parallelInterleaved
, but instead of throwing exceptions that are thrown by subcomputations,
they are returned in a data structure.
As a result, property 6 of parallelInterleaved
is not preserved, and therefore if your IO actions can depend on each other
and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
parallelFirst :: Pool -> [IO (Maybe a)] -> IO (Maybe a) Source #
Run the list of computations in parallel, returning the result of the first thread that completes with (Just x), if any
Has the following properties:
- Never creates more or less unblocked threads than are specified to
live in the pool. NB: this count includes the thread executing
parallelInterleaved
. This should minimize contention and hence pre-emption, while also preventing starvation. - On return all actions have either been performed or cancelled (with ThreadKilled exceptions).
- The above properties are true even if
parallelFirst
is used by an action which is itself being executed by one of the parallel combinators. - If any of the IO actions throws an exception, the exception thrown by the first
throwing action in the input list will be thrown by
parallelFirst
. Importantly, at the time the exception is thrown there is no guarantee that the other parallel actions have been completed or cancelled.
The motivation for this choice is that waiting for the all threads to either return or throw before throwing the first exception will almost always cause GHC to show the "Blocked indefinitely in MVar operation" exception rather than the exception you care about.
The reason for this behaviour can be seen by considering this machine state:
- The main thread has used the parallel combinators to spawn two threads, thread 1 and thread 2. It is blocked on both of them waiting for them to return either a result or an exception via an MVar.
- Thread 1 and thread 2 share another (empty) MVar, the "wait handle". Thread 2 is waiting on the handle, while thread 1 will eventually put into the handle.
Consider what happens when thread 1 is buggy and throws an exception before putting into the handle. Now thread 2 is blocked indefinitely, and so the main thread is also blocked indefinetly waiting for the result of thread 2. GHC has no choice but to throw the uninformative exception. However, what we really wanted to see was the original exception thrown in thread 1!
By having the main thread abandon its wait for the results of the spawned threads as soon as the first exception comes in, we give this exception a chance to actually be displayed.
parallelFirstE :: Pool -> [IO (Maybe a)] -> IO (Maybe (Either SomeException a)) Source #
As parallelFirst
, but instead of throwing exceptions that are thrown by subcomputations,
they are returned in a data structure.
As a result, property 4 of parallelFirst
is not preserved, and therefore if your IO actions can depend on each other
and may throw exceptions your program may die with "blocked indefinitely" exceptions rather than informative messages.
Pool management
A thread pool, containing a maximum number of threads. The best way to
construct one of these is using withPool
.
startPool :: Int -> IO Pool Source #
A slightly unsafe way to construct a pool. Make a pool from the maximum number of threads you wish it to execute (including the main thread in the count).
If you use this variant then ensure that you insert a call to stopPool
somewhere in your program after all users of that pool have finished.
A better alternative is to see if you can use the withPool
variant.
stopPool :: Pool -> IO () Source #
Clean up a thread pool. If you don't call this from the main thread then no one holds the queue, the queue gets GC'd, the threads find themselves blocked indefinitely, and you get exceptions.
This cleanly shuts down the threads so the queue isn't important and you don't get exceptions.
Only call this after all users of the pool have completed, or your program may block indefinitely.
extraWorkerWhileBlocked :: Pool -> IO a -> IO a Source #
You should wrap any IO action used from your worker threads that may block with this method. It temporarily spawns another worker thread to make up for the loss of the old blocked worker.
This is particularly important if the unblocking is dependent on worker threads actually doing work. If you have this situation, and you don't use this method to wrap blocking actions, then you may get a deadlock if all your worker threads get blocked on work that they assume will be done by other worker threads.
An example where something goes wrong if you don't use this to wrap blocking actions is the following example:
newEmptyMVar >>= \mvar -> parallel_ pool [readMVar mvar, putMVar mvar ()]
If we only have one thread, we will sometimes get a schedule where the readMVar
action is run
before the putMVar
. Unless we wrap the read with extraWorkerWhileBlocked
, if the pool has a
single thread our program to deadlock, because the worker will become blocked and no other thread
will be available to execute the putMVar
.
The correct code is:
newEmptyMVar >>= \mvar -> parallel_ pool [extraWorkerWhileBlocked pool (readMVar mvar), putMVar mvar ()]
Advanced pool management
spawnPoolWorkerFor :: Pool -> IO () Source #
Internal method for adding extra unblocked threads to a pool if one of the current
worker threads is going to be temporarily blocked. Unrestricted use of this is unsafe,
so we recommend that you use the extraWorkerWhileBlocked
function instead if possible.
killPoolWorkerFor :: Pool -> IO () Source #
Internal method for removing threads from a pool after one of the threads on the pool
becomes newly unblocked. Unrestricted use of this is unsafe, so we reccomend that you use
the extraWorkerWhileBlocked
function instead if possible.