Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Utility functions for building programs which consume work off of a queue.
Frequently you need to receive items from an external system and perform work on them. One way to structure such a program is to feed the items into a queue and then consume those items one at a time. That, of course, is slow—especially when then worker has to itself carry out computationally intensive tasks or interact itself with external systems. So we want to have multiple workers running, but only to an extent limited by the number of cores available, the number of external connections allowed, or some other constraint.
This library allows you to add items to a queue, then launch worker threads to consume those items at up to a specified maximum amount of concurrency.
Synopsis
- newQueue :: Program τ (Queue α)
- writeQueue :: Queue α -> α -> Program τ ()
- writeQueue' :: Foldable ω => Queue α -> ω α -> Program τ ()
- finishQueue :: Queue α -> Program τ ()
- runWorkers_ :: Int -> (α -> Program τ ()) -> Queue α -> Program τ ()
- mapWorkers :: Int -> (α -> Program τ β) -> [α] -> Program τ [β]
- data Queue α
- unQueue :: Queue α -> TQueue (Maybe α)
- getMachineSize :: Program τ Int
Work Queue
writeQueue :: Queue α -> α -> Program τ () Source #
Add an item to the queue.
Since: 0.6.9
writeQueue' :: Foldable ω => Queue α -> ω α -> Program τ () Source #
Add a list of items to the queue.
Since: 0.6.9
finishQueue :: Queue α -> Program τ () Source #
Indicate that you are finished adding queue, thereby allowing the worker threads consuming from the queue to complete and return.
Remember that you can call at any time, even before you have launched the
worker threads with runWorkers_
.
Since: 0.6.9
Worker Threads
runWorkers_ :: Int -> (α -> Program τ ()) -> Queue α -> Program τ () Source #
Run a pool of worker threads which consume items off the work queue.
Once you have an action that enqueues items with writeQueue
you can then
launch the worker threads:
runWorkers_
16 worker queue
consuming 16 items at a time concurrently in this example.
It is assumed that the workers have a way of communicating their results
onwards, either because they are side-effecting in the real world themselves,
or because you have passed in some MVar
or TQueue
to
collect the results.
Since: 0.6.9
mapWorkers :: Int -> (α -> Program τ β) -> [α] -> Program τ [β] Source #
Map a pool of workers over a list concurrently.
Simply forking one Haskell thread for every item in a list is a suprisingly
reasonable choice in many circumstances given how good Haskell's concurrency
machinery is, and in this library can be achieved by forM
ing
forkThread
over a list of items. But if you need tighter control over the
amount of concurrency—as is often the case when doing something
computationally heavy or making requests of an external service with known
limitations—then you are better off using this convenience function.
(this was originally modelled on async's
mapConcurrently
. That implementation has the
drawback that the number of threads created is set by the size of the
structure being traversed. Here we set the amount of concurrency explicitly.)
Be aware that the order of items in the output list is non-deterministic and will depend on the order that the action function completes, not the order of items in the input.
Since: 0.6.9
Internals
unQueue :: Queue α -> TQueue (Maybe α) Source #
Access the underlying queue. We make use of the STM TQueue
type, so you'll
want the following imports:
import Control.Concurrent.STM (atomically
) import Control.Concurrent.STM.TQueue (TQueue
,writeTQueue
)
Having accessed the underlying queue you can write items, wrapped in Just
, to
it directly:
liftIO
$ doatomically
$ dowriteTQueue
queue (Just
item)
A Nothing
written to the underlying queue will signal the worker threads
that the end of input has been reached and they can safely return.
Since: 0.6.9
getMachineSize :: Program τ Int Source #
Report back the number of processor cores that are available as Haskell
"capabilities" (this was set when you launched the program with
execute
). This can best be used to set the number of
concurrent worker threads when running runWorkers_
or mapWorkers
.
Since: 0.6.9