Copyright | (c) Alexey Kuleshevich 2018-2019 |
---|---|
License | BSD3 |
Maintainer | Alexey Kuleshevich <lehins@yandex.ru> |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell2010 |
Synopsis
- data Scheduler m a
- numWorkers :: Scheduler m a -> Int
- scheduleWork :: Scheduler m a -> m a -> m ()
- scheduleWork_ :: Scheduler m () -> m () -> m ()
- terminate :: Scheduler m a -> a -> m a
- terminate_ :: Scheduler m () -> m ()
- terminateWith :: Scheduler m a -> a -> m a
- withScheduler :: MonadUnliftIO m => Comp -> (Scheduler m a -> m b) -> m [a]
- withScheduler_ :: MonadUnliftIO m => Comp -> (Scheduler m a -> m b) -> m ()
- trivialScheduler_ :: Applicative f => Scheduler f ()
- data Comp where
- getCompWorkers :: MonadIO m => Comp -> m Int
- replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a]
- replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m ()
- traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b)
- traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m ()
- traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f ()
Scheduler
Main type for scheduling work. See withScheduler
or withScheduler_
for ways to construct
and use this data type.
Since: 1.0.0
numWorkers :: Scheduler m a -> Int Source #
Get the number of workers. Will mainly depend on the computation strategy and/or number of
capabilities you have. Related function is getCompWorkers
.
Since: 1.0.0
scheduleWork :: Scheduler m a -> m a -> m () Source #
Schedule an action to be picked up and computed by a worker from a pool of jobs.
Since: 1.0.0
scheduleWork_ :: Scheduler m () -> m () -> m () Source #
Same as scheduleWork
, but only for a Scheduler
that doesn't keep the result.
Since: 1.1.0
terminate :: Scheduler m a -> a -> m a Source #
As soon as possible try to terminate any computation that is being performed by all workers managed by this scheduler and collect whatever results have been computed, with supplied element guaranteed to being the last one.
Important - With Seq
strategy this will not stop other scheduled tasks from being computed,
although it will make sure their results are discarded.
Since: 1.1.0
terminate_ :: Scheduler m () -> m () Source #
terminateWith :: Scheduler m a -> a -> m a Source #
Same as terminate
, but returning a single element list containing the supplied
argument. This can be very useful for parallel search algorithms.
Important - Same as with terminate
, when Seq
strategy is used, this will not prevent
computation from continuing, but the scheduler will return only the result supplied to this
function.
Since: 1.1.0
Initialize Scheduler
:: MonadUnliftIO m | |
=> Comp | Computation strategy |
-> (Scheduler m a -> m b) | Action that will be scheduling all the work. |
-> m [a] |
Initialize a scheduler and submit jobs that will be computed sequentially or in parallelel,
which is determined by the Comp
utation strategy.
Here are some cool properties about the withScheduler
:
- This function will block until all of the submitted jobs have finished or at least one of them resulted in an exception, which will be re-thrown at the callsite.
- It is totally fine for nested jobs to submit more jobs for the same or other scheduler
- It is ok to initialize multiple schedulers at the same time, although that will likely result in suboptimal performance, unless workers are pinned to different capabilities.
- Warning It is pretty dangerous to schedule jobs that do blocking
IO
, since it can easily lead to deadlock, if you are not careful. Consider this example. First execution works fine, since there are two scheduled workers, and one can unblock the other, but the second scenario immediately results in a deadlock.
>>>
withScheduler (ParOn [1,2]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
[(),()]>>>
import System.Timeout
>>>
timeout 1000000 $ withScheduler (ParOn [1]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
Nothing
Important: In order to get work done truly in parallel, program needs to be compiled with
-threaded
GHC flag and executed with +RTS -N -RTS
to use all available cores.
Since: 1.0.0
:: MonadUnliftIO m | |
=> Comp | Computation strategy |
-> (Scheduler m a -> m b) | Action that will be scheduling all the work. |
-> m () |
Same as withScheduler
, but discards results of submitted jobs.
Since: 1.0.0
trivialScheduler_ :: Applicative f => Scheduler f () Source #
The most basic scheduler that simply runs the task instead of scheduling it. Early termination requests are simply ignored.
Since: 1.1.0
Computation strategies
Computation strategy to use when scheduling work.
Seq | Sequential computation |
ParOn ![Int] | Schedule workers to run on specific capabilities. Specifying an empty list |
ParN !Word16 | Specify the number of workers that will be handling all the jobs. Difference from |
pattern Par :: Comp | Parallel computation using all available cores. Same as Since: 1.0.0 |
pattern Par' :: Comp | Parallel computation using all available cores. Same as Since: 1.1.0 |
getCompWorkers :: MonadIO m => Comp -> m Int Source #
Figure out how many workers will this computation strategy create
Since: 1.1.0
Useful functions
replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a] Source #
Replicate an action n
times and schedule them acccording to the supplied computation
strategy.
Since: 1.1.0
replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m () Source #
Just like replicateConcurrently
, but discards the results of computation.
Since: 1.1.0
traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b) Source #
Map an action over each element of the Traversable
t
acccording to the supplied computation
strategy.
Since: 1.0.0
traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m () Source #
Just like traverseConcurrently
, but restricted to Foldable
and discards the results of
computation.
Since: 1.0.0
traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f () Source #
This is generally a faster way to traverse while ignoring the result rather than using mapM_
.
Since: 1.0.0
Exceptions
If any one of the workers dies with an exception, even if that exceptions is asynchronous, it will be re-thrown in the scheduling thread.
>>>
let didAWorkerDie = handleJust asyncExceptionFromException (return . (== ThreadKilled)) . fmap or
>>>
:t didAWorkerDie
didAWorkerDie :: Foldable t => IO (t Bool) -> IO Bool>>>
didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ pure False
False>>>
didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
True>>>
withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
*** Exception: thread killed