Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Utility functions for running Program
actions concurrently.
Haskell uses green threads: small lines of work that are scheduled down onto actual execution contexts (set by default by this library to be one per core). Haskell threads are incredibly lightweight, and you are encouraged to use them freely. Haskell provides a rich ecosystem of tools to do work concurrently and to communicate safely between threads.
This module provides wrappers around some of these primatives so you can use
them easily from the Program
monad.
Note that when you fire off a new thread the top-level application state is
shared; it's the same τ
inherited from the parent Program
.
Synopsis
- createScope :: Program τ α -> Program τ α
- forkThread :: Program τ α -> Program τ (Thread α)
- forkThread_ :: Program τ α -> Program τ ()
- waitThread :: Thread α -> Program τ α
- waitThread_ :: Thread α -> Program τ ()
- waitThread' :: Thread α -> Program τ (Either SomeException α)
- waitThreads' :: [Thread α] -> Program τ [Either SomeException α]
- cancelThread :: Thread α -> Program τ ()
- concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β)
- concurrentThreads_ :: Program τ α -> Program τ β -> Program τ ()
- raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β)
- raceThreads_ :: Program τ α -> Program τ β -> Program τ ()
- timeoutThread :: Rational -> Program τ α -> Program τ α
- data Thread α
- data Terminator = ThreadCancelled
- data Timeout = Timeout
Concurrency
createScope :: Program τ α -> Program τ α Source #
Create a scope to enclose any subsequently spawned threads as a single group. Ordinarily threads launched in Haskell are completely indepedent. Creating a scope allows you to operate on a set of threads as a single group with bi-directional exception passing. This is the basis of an approach called structured concurrency.
When the execution flow exits the scope, any threads that were spawned within it that are still running will be killed.
If any of the child threads within the scope throws an exception, the other remaining threads will be killed and then the original exception will be propegated to this parent thread and re-thrown.
Since: 0.6.0
forkThread :: Program τ α -> Program τ (Thread α) Source #
Fork a thread. The child thread will run in the same Context
as the calling
Program
, including sharing the user-defined application state value.
If you want to find out what the result of a thread was use waitThread
on
the Thread
object returned from this function. For example:
t1 <-forkThread
$ doinfo
"Doing interesting stuff concurrently"pure
True ... result <-waitThread
t1 if result then -- expected else -- not good
If you don't need the result, you can use forkThread_
instead.
Threads that are launched off as children are on their own! If the code in the child thread throws an exception that is not caught within that thread, the exception will kill the thread. Threads dying without telling anyone is a bit of an anti-pattern, so this library logs a warning-level log message if this happens.
(this function wraps base's forkIO
)
Concerning telemetry
Note that threads inherit the telemetry state from their parent. If you are using the tracing features from core-telemetry any telemetry registered in that side task will be included in the enclosing span active in the parent thread that spawned the thread:
t2 <-forkThread
$ doinfo
"Performing quick side task"telemetry
[ 'metric
"counter" 42 ] ...
In this case the "counter"
field in the parent thread's current span will
get the value 42
. This is appropriate for the common case where you are doing
small side tasks concurrently to accelerate a larger computation.
But at other times you are launching off a fully independent control flow and want it to have its own telemetry. In those cases, you'll want to start a new span (or even a new trace) immediately after forking the thread:
forkThread_
$ doencloseSpan
"subTask" $ do ...
any telemetry from this worker thread will be appropriately nested in a new
child span called "subTask"
.
Since: 0.2.7
forkThread_ :: Program τ α -> Program τ () Source #
Fork a thread with forkThread
but do not wait for a result. This is on the
assumption that the sub program will either be a side-effect and over quickly,
or long-running daemon thread (presumably containing a forever
loop in it), never returning.
Since: 0.5.2
waitThread :: Thread α -> Program τ α Source #
Wait for the completion of a thread, returning the result. This is a blocking operation.
If the thread you are waiting on throws an exception it will be rethrown by
waitThread
.
If the current thread making this call is cancelled (as a result of being on
the losing side of concurrentThreads
or raceThreads
for example, or due to
the current scope exiting), then the thread you are waiting on will be
cancelled too. This is necessary to ensure that child threads are not leaked
if you nest forkThread
s.
Since: 0.2.7
waitThread_ :: Thread α -> Program τ () Source #
Wait for the completion of a thread, discarding its result. This is particularly useful at the end of a do-block if you're waiting on a worker thread to finish but don't need its return value, if any; otherwise you have to explicily deal with the unused return value:
_ <-waitThread
t1return
()
which is a bit tedious. Instead, you can just use this convenience function:
waitThread_
t1
The trailing underscore in the name of this function follows the same
convetion as found in Control.Monad, which has mapM_
which
does the same as mapM
but which likewise discards the return
value.
Since: 0.2.7
waitThread' :: Thread α -> Program τ (Either SomeException α) Source #
Wait for a thread to complete, returning the result if the computation was successful or the exception if one was thrown by the child thread.
This basically is convenience for calling waitThread
and putting catch
around it, but as with all the other wait*
functions this ensures that if
the thread waiting is killed the cancellation is propagated to the thread
being watched as well.
Since: 0.4.5
waitThreads' :: [Thread α] -> Program τ [Either SomeException α] Source #
Wait for many threads to complete. This function is intended for the scenario
where you fire off a number of worker threads with forkThread
but rather
than leaving them to run independantly, you need to wait for them all to
complete.
The results of the threads that complete successfully will be returned as
Right
values. Should any of the threads being waited upon throw an
exception, those exceptions will be returned as Left
values.
If you don't need to analyse the failures individually, then you can just
collect the successes using Data.Either's rights
:
responses <-waitThreads'
info
"Aggregating results..." combineResults (rights
responses)
Likewise, if you do want to do something with all the failures, you might
find lefts
useful:
mapM_
(warn
.intoRope
.displayException
) (lefts
responses)
If the thread calling waitThreads'
is cancelled, then all the threads being
waited upon will also be cancelled. This often occurs within a timeout or
similar control measure implemented using raceThreads_
. Should the thread
that spawned all the workers and is waiting for their results be told to
cancel because it lost the "race", the child threads need to be told in turn
to cancel so as to avoid those threads being leaked and continuing to run as
zombies. This function takes care of that.
(this extends waitThread'
to work across a list of Threads, taking care to
ensure the cancellation behaviour described throughout this module)
Since: 0.4.5
cancelThread :: Thread α -> Program τ () Source #
Cancel a thread.
Be careful when using this. If you are planning cancel a worker thread then
the main thread that is waitThread
ing on it will throw an exception,
specifically ThreadCancelled
(unless something else has already thrown an
exception in which case that will be thrown instead). In this scenario you
will need to catch
around your waiting function
otherwise the uncaught exception will continue to unwind your execution stack
and probably end your program.
(this wraps base's killThread
. The underlying
mechanism used is to throw the ThreadKilled
exception to the other
thread. That exception is asynchronous, so will not be trapped by a
catch
block and will indeed cause the thread
receiving the exception to come to an end)
Since: 0.4.5
Helper functions
concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β) Source #
Fork two threads and wait for both to finish. The return value is the pair of each action's return types.
This is the same as calling forkThread
and waitThread
twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.
(a,b) <- concurrentThreads
one two
-- continue, doing something with both results.
For a variant that ingores the return values and just waits for both see
concurrentThreads_
below.
Since: 0.4.0
concurrentThreads_ :: Program τ α -> Program τ β -> Program τ () Source #
Fork two threads and wait for both to finish.
This is the same as calling forkThread
and waitThread_
twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.
Since: 0.4.0
raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β) Source #
Fork two threads and race them against each other. This blocks until one or
the other of the threads finishes. The return value will be Left
α
if the
first program (one
) completes first, and Right
β
if it is the second
program (two
) which finishes first. The sub program which is still running
will be cancelled with an exception.
result <-raceThreads
one two case result ofLeft
a -> do -- one finished firstRight
b -> do -- two finished first
For a variant that ingores the return value and just races the threads see
raceThreads_
below.
Since: 0.4.0
raceThreads_ :: Program τ α -> Program τ β -> Program τ () Source #
Fork two threads and race them against each other. When one action completes the other will be cancelled with an exception. This is useful for enforcing timeouts:
raceThreads_
(sleepThread
300) (do -- We expect this to complete within 5 minutes. performAction )
Since: 0.4.0
timeoutThread :: Rational -> Program τ α -> Program τ α Source #
Run a program that needs to complete before the given number of seconds have
elapsed. This will return the result of the sub program or throw the Timeout
exception if the limit is exceeded.
Since: 0.6.9
Internals
A thread for concurrent computation.
(this wraps base's ThreadId
along with a holder for
the result of the thread)
Since: 0.6.0
data Terminator Source #
When a thread is aborted with cancelThread
this value is used to mark a
failed computation inside the Thread
. Although it is not the mechanism used
internally to kill the computation, it is the exception that is subsequently
rethrown from waitThread
if you are waiting on that thread to finish,
allowing you to catch
the case of a thread being
cancelled if necessary.
This is mostly here to differentiate from ThreadKilled
,
giving you some knowledge as to whether it was your explicit cancelThread
that ended the thread, or something else. You need to handle it either way,
but sometimes you want to know the difference.
Since: 0.6.8
Instances
Exception Terminator Source # | |
Defined in Core.Program.Threads toException :: Terminator -> SomeException # fromException :: SomeException -> Maybe Terminator # displayException :: Terminator -> String # | |
Show Terminator Source # | |
Defined in Core.Program.Threads showsPrec :: Int -> Terminator -> ShowS # show :: Terminator -> String # showList :: [Terminator] -> ShowS # |
If a timeout is exceeded this exception will be thrown by timeoutThread
.
Since: 0.6.9
Instances
Exception Timeout Source # | |
Defined in Core.Program.Threads toException :: Timeout -> SomeException # fromException :: SomeException -> Maybe Timeout # displayException :: Timeout -> String # | |
Show Timeout Source # | |