Safe Haskell | None |
---|---|
Language | Haskell2010 |
ki
is a lightweight structured concurrency library.
For a variant of this API generalized to
MonadUnliftIO
,
see ki-unlifted
.
Remember to link your program with -threaded
to use the threaded runtime!
Synopsis
- data Scope
- data Thread a
- scoped :: (Scope -> IO a) -> IO a
- fork :: Scope -> IO a -> IO (Thread a)
- forkTry :: forall e a. Exception e => Scope -> IO a -> IO (Thread (Either e a))
- await :: Thread a -> STM a
- wait :: Scope -> STM ()
- fork_ :: Scope -> IO Void -> IO ()
- forkWith :: Scope -> ThreadOptions -> IO a -> IO (Thread a)
- forkWith_ :: Scope -> ThreadOptions -> IO Void -> IO ()
- forkTryWith :: forall e a. Exception e => Scope -> ThreadOptions -> IO a -> IO (Thread (Either e a))
- data ThreadOptions = ThreadOptions {}
- defaultThreadOptions :: ThreadOptions
- data ThreadAffinity
- = Unbound
- | Capability Int
- | OsThread
- data ByteCount
- kilobytes :: Natural -> ByteCount
- megabytes :: Natural -> ByteCount
Introduction
Structured concurrency is a paradigm of concurrent programming in which a lexical scope delimits the lifetime of each thread. Threads form a "call tree" hierarchy in which no child can outlive its parent.
Exceptions are propagated promptly from child to parent and vice-versa:
- If an exception is raised in a child thread, the child raises the same exception in its parent, then terminates.
- If an exception is raised in a parent thread, the parent first raises an exception in all of its living children, waits for them to terminate, then re-raises the original exception.
All together, this library:
- Guarantees the absence of "ghost threads" (i.e. threads that accidentally continue to run alongside the main thread long after the function that spawned them returns).
- Performs prompt, bi-directional exception propagation when an exception is raised anywhere in the call tree.
- Provides a safe and flexible API that can be used directly, or with which higher-level concurrency patterns can be built on top, such as worker queues, publish-subscribe, supervision trees, and so on.
For a longer introduction to structured concurrency, including an educative analogy to structured programming, please read Nathaniel J. Smith's blog post, "Notes on structured concurrency, or: Go statement considered harmful".
👉 Quick start examples
Perform two actions concurrently, and wait for both of them to complete.
Note that this program only creates one additional thread, unlike the
concurrently
combinator from theasync
package, which creates two.concurrently :: IO a -> IO b -> IO (a, b) concurrently action1 action2 = Ki.
scoped
\scope -> do thread1 <- Ki.fork
scope action1 result2 <- action2 result1 <- atomically (Ki.await
thread1) pure (result1, result2)Perform two actions concurrently, and when the first action terminates, stop executing the other.
race :: IO a -> IO a -> IO a race action1 action2 = Ki.
scoped
\scope -> do resultVar <- newEmptyMVar _ <- Ki.fork
scope (action1 >>= tryPutMVar resultVar) _ <- Ki.fork
scope (action2 >>= tryPutMVar resultVar) takeMVar resultVar
Core API
A thread scope.
👉 Details
- A thread scope delimits the lifetime of all threads created within it (see
fork
,forkTry
). - A thread scope can only be created with
scoped
, is only valid during the provided callback. The thread scope object explicitly represents the lexical scope induced by
scoped
:scoped \scope -> -- This indented region of the code is represented by the variable `scope`
- The thread that creates a scope is considered the parent of all threads created within it.
A child thread.
👉 Details
- If an exception is raised in a child thread, the child propagates the exception to its parent (see
fork
). - An exception may be caught and returned as a value instead (see
forkTry
). - Asynchronous exceptions are always propagated.
- A thread cannot be terminated explicitly ala
killThread
. However, all threads created within a scope are terminated when the scope closes.
scoped :: (Scope -> IO a) -> IO a Source #
Execute an action in a new scope.
👉 Details
Just before a call to
scoped
returns, whether with a value or an exception:- The parent thread raises an exception in all of its living children.
- The parent thread blocks until those threads terminate.
- Child threads created within a scope can be awaited individually (see
await
), or as a collection (seewait
).
fork :: Scope -> IO a -> IO (Thread a) Source #
Create a child thread to execute an action within a scope.
Masking state
The child thread does not mask asynchronous exceptions, regardless of the parent thread's masking state.
To create a child thread with a different initial masking state, use forkWith
.
forkTry :: forall e a. Exception e => Scope -> IO a -> IO (Thread (Either e a)) Source #
Like fork
, but the child thread does not propagate exceptions that are both:
- Synchronous (i.e. not an instance of
SomeAsyncException
). - An instance of
e
.
Extended API
forkWith :: Scope -> ThreadOptions -> IO a -> IO (Thread a) Source #
Variant of fork
that takes an additional options argument.
forkWith_ :: Scope -> ThreadOptions -> IO Void -> IO () Source #
Variant of forkWith
for threads that never return.
forkTryWith :: forall e a. Exception e => Scope -> ThreadOptions -> IO a -> IO (Thread (Either e a)) Source #
Variant of forkTry
that takes an additional options argument.
Thread options
data ThreadOptions Source #
affinity
The affinity of a thread. A thread can be unbound, bound to a specific capability, or bound to a specific OS thread.
Default:
Unbound
allocationLimit
The maximum number of bytes a thread may allocate before it is delivered an
AllocationLimitExceeded
exception. If caught, the thread is allowed to allocate an additional 100kb (tunable with+RTS -xq
) to perform any necessary cleanup actions; if exceeded, the thread is delivered another.Default:
Nothing
(no limit)label
The label of a thread, visible in the event log (
+RTS -l
).Default:
""
(no label)maskingState
The masking state a thread is created in. To unmask, use
unsafeUnmask
.Default:
Unmasked
Instances
Eq ThreadOptions Source # | |
Defined in Ki.Internal.Thread (==) :: ThreadOptions -> ThreadOptions -> Bool # (/=) :: ThreadOptions -> ThreadOptions -> Bool # | |
Show ThreadOptions Source # | |
Defined in Ki.Internal.Thread showsPrec :: Int -> ThreadOptions -> ShowS # show :: ThreadOptions -> String # showList :: [ThreadOptions] -> ShowS # |
defaultThreadOptions :: ThreadOptions Source #
Default thread options.
ThreadOptions
{affinity
= Nothing ,allocationLimit
= Nothing ,label
= "" ,maskingState
= Unmasked }
data ThreadAffinity Source #
What, if anything, a thread is bound to.
Unbound | Unbound. |
Capability Int | Bound to a capability. |
OsThread | Bound to an OS thread. |
Instances
Eq ThreadAffinity Source # | |
Defined in Ki.Internal.Thread (==) :: ThreadAffinity -> ThreadAffinity -> Bool # (/=) :: ThreadAffinity -> ThreadAffinity -> Bool # | |
Show ThreadAffinity Source # | |
Defined in Ki.Internal.Thread showsPrec :: Int -> ThreadAffinity -> ShowS # show :: ThreadAffinity -> String # showList :: [ThreadAffinity] -> ShowS # |