{-# LANGUAGE CPP, MagicHash, UnboxedTuples, RankNTypes,
    ExistentialQuantification #-}
#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif
#if __GLASGOW_HASKELL__ < 710
{-# LANGUAGE DeriveDataTypeable #-}
#endif
{-# OPTIONS -Wall -fno-warn-implicit-prelude -fno-warn-unused-imports #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.Async
-- Copyright   :  (c) Simon Marlow 2012
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Simon Marlow <marlowsd@gmail.com>
-- Stability   :  provisional
-- Portability :  non-portable (requires concurrency)
--
-- This module provides a set of operations for running IO operations
-- asynchronously and waiting for their results.  It is a thin layer
-- over the basic concurrency operations provided by
-- "Control.Concurrent".  The main additional functionality it
-- provides is the ability to wait for the return value of a thread,
-- but the interface also provides some additional safety and
-- robustness over using 'forkIO' threads and @MVar@ directly.
--
-- == High-level API
--
-- @async@'s high-level API spawns /lexically scoped/ threads,
-- ensuring the following key poperties that make it safer to use
-- than using plain 'forkIO':
--
-- 1. No exception is swallowed (waiting for results propagates exceptions).
-- 2. No thread is leaked (left running unintentionally).
--
-- (This is done using the 'Control.Exception.bracket' pattern to work in presence
-- of synchornous and asynchronous exceptions.)
--
-- __Most practical/production code should only use the high-level API__.
--
-- The basic type is @'Async' a@, which represents an asynchronous
-- @IO@ action that will return a value of type @a@, or die with an
-- exception.  An 'Async' is a wrapper around a low-level 'forkIO' thread.
--
-- The fundamental function to spawn threads with the high-level API is
-- 'withAsync'.
--
-- For example, to fetch two web pages at the same time, we could do
-- this (assuming a suitable @getURL@ function):
--
-- > withAsync (getURL url1) $ \a1 -> do
-- >   withAsync (getURL url2) $ \a2 -> do
-- >     page1 <- wait a1
-- >     page2 <- wait a2
-- >     ...
--
-- where 'withAsync' starts the operation in a separate thread, and
-- 'wait' waits for and returns the result.
--
-- * If the operation throws an exception, then that exception is re-thrown
--   by 'wait'. This ensures property (1): No exception is swallowed.
-- * If an exception bubbles up through a 'withAsync', then the 'Async'
--   it spawned is 'cancel'ed. This ensures property (2): No thread is leaked.
--
-- Often we do not care to work manually with 'Async' handles like
-- @a1@ and @a2@. Instead, we want to express high-level objectives like
-- performing two or more tasks concurrently, and waiting for one or all
-- of them to finish.
--
-- For example, the pattern of performing two IO actions concurrently and
-- waiting for both their results is packaged up in a combinator 'concurrently',
-- so we can further shorten the above example to:
--
-- > (page1, page2) <- concurrently (getURL url1) (getURL url2)
-- > ...
--
-- The section __/High-level utilities/__ covers the most
-- common high-level objectives, including:
--
-- * Waiting for 2 results ('concurrently').
-- * Waiting for many results ('mapConcurrently' / 'forConcurrently').
-- * Waiting for the first of 2 results ('race').
-- * Waiting for arbitrary nestings of "all of /N/" and "the first of /N/"
--   results with the 'Concurrently' newtype and its 'Applicative' and
--   'Alternative' instances.
--
-- Click here to scroll to that section:
-- "Control.Concurrent.Async#high-level-utilities".
--
-- == Low-level API
--
-- Some use cases require parallelism that is not lexically scoped.
--
-- For those, the low-level function 'async' can be used as a direct
-- equivalent of 'forkIO':
--
-- > -- Do NOT use this code in production, it has a flaw (explained below).
-- > do
-- >   a1 <- async (getURL url1)
-- >   a2 <- async (getURL url2)
-- >   page1 <- wait a1
-- >   page2 <- wait a2
-- >   ...
--
-- In contrast to 'withAsync', this code has a problem.
--
-- It still fulfills property (1) in that an exception arising from
-- @getUrl@ will be re-thrown by 'wait', but it does not fulfill
-- property (2).
-- Consider the case when the first 'wait' throws an exception; then the
-- second 'wait' will not happen, and the second 'async' may be left
-- running in the background, possibly indefinitely.
--
-- 'withAsync' is like 'async', except that the 'Async' is
-- automatically killed (using 'uninterruptibleCancel') if the
-- enclosing IO operation returns before it has completed.
-- Furthermore, 'withAsync' allows a tree of threads to be built, such
-- that children are automatically killed if their parents die for any
-- reason.
--
-- If you need to use the low-level API, ensure that you gurantee
-- property (2) by other means, such as 'link'ing asyncs that need
-- to die together, and protecting against asynchronous exceptions
-- using 'Control.Exception.bracket', 'Control.Exception.mask',
-- or other functions from "Control.Exception".
--
-- == Miscellaneous
--
-- The 'Functor' instance can be used to change the result of an
-- 'Async'.  For example:
--
-- > ghci> withAsync (return 3) (\a -> wait (fmap (+1) a))
-- > 4
--
-- === Resource exhaustion
--
-- As with all concurrent programming, keep in mind that while
-- Haskell's cooperative ("green") multithreading carries low overhead,
-- spawning too many of them at the same time may lead to resource exhaustion
-- (of memory, file descriptors, or other limited resources), given that the
-- actions running in the threads consume these resources.

-----------------------------------------------------------------------------

module Control.Concurrent.Async (

    -- * Asynchronous actions
    Async,

    -- * High-level API

    -- ** Spawning with automatic 'cancel'ation
    withAsync, withAsyncBound, withAsyncOn, withAsyncWithUnmask,
    withAsyncOnWithUnmask,

    -- ** Querying 'Async's
    wait, poll, waitCatch, asyncThreadId,
    cancel, uninterruptibleCancel, cancelWith, AsyncCancelled(..),

    -- ** #high-level-utilities# High-level utilities
    race, race_,
    concurrently, concurrently_,
    mapConcurrently, forConcurrently,
    mapConcurrently_, forConcurrently_,
    replicateConcurrently, replicateConcurrently_,
    Concurrently(..),
    compareAsyncs,

    -- ** Specialised operations

    -- *** STM operations
    waitSTM, pollSTM, waitCatchSTM,

    -- *** Waiting for multiple 'Async's
    waitAny, waitAnyCatch, waitAnyCancel, waitAnyCatchCancel,
    waitEither, waitEitherCatch, waitEitherCancel, waitEitherCatchCancel,
    waitEither_,
    waitBoth,

    -- *** Waiting for multiple 'Async's in STM
    waitAnySTM, waitAnyCatchSTM,
    waitEitherSTM, waitEitherCatchSTM,
    waitEitherSTM_,
    waitBothSTM,

    -- * Low-level API

    -- ** Spawning (low-level API)
    async, asyncBound, asyncOn, asyncWithUnmask, asyncOnWithUnmask,

    -- ** Linking
    link, linkOnly, link2, link2Only, ExceptionInLinkedThread(..),

  ) where

import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Concurrent
import qualified Data.Foldable as F
#if !MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Control.Monad
import Control.Applicative
#if !MIN_VERSION_base(4,8,0)
import Data.Monoid (Monoid(mempty,mappend))
import Data.Traversable
#endif
#if __GLASGOW_HASKELL__ < 710
import Data.Typeable
#endif
#if MIN_VERSION_base(4,9,0)
import Data.Semigroup (Semigroup((<>)))
#endif

import Data.IORef

import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc

-- -----------------------------------------------------------------------------
-- STM Async API


-- | An asynchronous action spawned by 'async' or 'withAsync'.
-- Asynchronous actions are executed in a separate thread, and
-- operations are provided for waiting for asynchronous actions to
-- complete and obtaining their results (see e.g. 'wait').
--
data Async a = Async
  { Async a -> ThreadId
asyncThreadId :: {-# UNPACK #-} !ThreadId
                  -- ^ Returns the 'ThreadId' of the thread running
                  -- the given 'Async'.
  , Async a -> STM (Either SomeException a)
_asyncWait    :: STM (Either SomeException a)
  }

instance Eq (Async a) where
  Async ThreadId
a STM (Either SomeException a)
_ == :: Async a -> Async a -> Bool
== Async ThreadId
b STM (Either SomeException a)
_  =  ThreadId
a ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
b

instance Ord (Async a) where
  Async ThreadId
a STM (Either SomeException a)
_ compare :: Async a -> Async a -> Ordering
`compare` Async ThreadId
b STM (Either SomeException a)
_  =  ThreadId
a ThreadId -> ThreadId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` ThreadId
b

instance Functor Async where
  fmap :: (a -> b) -> Async a -> Async b
fmap a -> b
f (Async ThreadId
a STM (Either SomeException a)
w) = ThreadId -> STM (Either SomeException b) -> Async b
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
a ((Either SomeException a -> Either SomeException b)
-> STM (Either SomeException a) -> STM (Either SomeException b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> Either SomeException a -> Either SomeException b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) STM (Either SomeException a)
w)

-- | Compare two Asyncs that may have different types by their 'ThreadId'.
compareAsyncs :: Async a -> Async b -> Ordering
compareAsyncs :: Async a -> Async b -> Ordering
compareAsyncs (Async ThreadId
t1 STM (Either SomeException a)
_) (Async ThreadId
t2 STM (Either SomeException b)
_) = ThreadId -> ThreadId -> Ordering
forall a. Ord a => a -> a -> Ordering
compare ThreadId
t1 ThreadId
t2

-- | Spawn an asynchronous action in a separate thread.
--
-- Like for 'forkIO', the action may be left running unintentinally
-- (see module-level documentation for details).
--
-- __Use 'withAsync' style functions wherever you can instead!__
async :: IO a -> IO (Async a)
async :: IO a -> IO (Async a)
async = ((IO () -> IO ThreadId) -> IO a -> IO (Async a))
-> (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO

-- | Like 'async' but using 'forkOS' internally.
asyncBound :: IO a -> IO (Async a)
asyncBound :: IO a -> IO (Async a)
asyncBound = (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
forkOS

-- | Like 'async' but using 'forkOn' internally.
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn = (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing ((IO () -> IO ThreadId) -> IO a -> IO (Async a))
-> (Int -> IO () -> IO ThreadId) -> Int -> IO a -> IO (Async a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn

-- | Like 'async' but using 'forkIOWithUnmask' internally.  The child
-- thread is passed a function that can be used to unmask asynchronous
-- exceptions.
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask :: ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask (forall b. IO b -> IO b) -> IO a
actionWith = (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally.  The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask :: Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
  (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

asyncUsing :: (IO () -> IO ThreadId)
           -> IO a -> IO (Async a)
asyncUsing :: (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
doFork = \IO a
action -> do
   TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
   -- t <- forkFinally action (\r -> atomically $ putTMVar var r)
   -- slightly faster:
   ThreadId
t <- ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
          IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall b. IO b -> IO b
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
   Async a -> IO (Async a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var))

-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function.  When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
--
-- > withAsync action inner = mask $ \restore -> do
-- >   a <- async (restore action)
-- >   restore (inner a) `finally` uninterruptibleCancel a
--
-- This is a useful variant of 'async' that ensures an @Async@ is
-- never left running unintentionally.
--
-- Note: a reference to the child thread is kept alive until the call
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync = ((IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b)
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO

-- | Like 'withAsync' but uses 'forkOS' internally.
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound = (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
forkOS

-- | Like 'withAsync' but uses 'forkOn' internally.
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn = (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing ((IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b)
-> (Int -> IO () -> IO ThreadId)
-> Int
-> IO a
-> (Async a -> IO b)
-> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn

-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally.  The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
withAsyncWithUnmask
  :: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask :: ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask (forall b. IO b -> IO b) -> IO a
actionWith =
  (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally.  The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions
withAsyncOnWithUnmask
  :: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask :: Int
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
  (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)

withAsyncUsing :: (IO () -> IO ThreadId)
               -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow.  We can do better by
-- hand-coding it:
withAsyncUsing :: (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
  TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
  ((forall b. IO b -> IO b) -> IO b) -> IO b
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO b) -> IO b)
-> ((forall b. IO b -> IO b) -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
    ThreadId
t <- IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall b. IO b -> IO b
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
    let a :: Async a
a = ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
    b
r <- IO b -> IO b
forall b. IO b -> IO b
restore (Async a -> IO b
inner Async a
a) IO b -> (SomeException -> IO b) -> IO b
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do
      Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
      SomeException -> IO b
forall e a. Exception e => e -> IO a
throwIO SomeException
e
    Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
    b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- | Wait for an asynchronous action to complete, and return its
-- value.  If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: Async a -> IO a
wait = IO a -> IO a
forall b. IO b -> IO b
tryAgain (IO a -> IO a) -> (Async a -> IO a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> (Async a -> STM a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM a
forall a. Async a -> STM a
waitSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | Wait for an asynchronous action to complete, and return either
-- @Left e@ if the action raised an exception @e@, or @Right a@ if it
-- returned a value @a@.
--
-- > waitCatch = atomically . waitCatchSTM
--
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch = IO (Either SomeException a) -> IO (Either SomeException a)
forall b. IO b -> IO b
tryAgain (IO (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> IO (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> STM (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | Check whether an 'Async' has completed yet.  If it has not
-- completed yet, then the result is @Nothing@, otherwise the result
-- is @Just e@ where @e@ is @Left x@ if the @Async@ raised an
-- exception @x@, or @Right a@ if it returned a value @a@.
--
-- > poll = atomically . pollSTM
--
{-# INLINE poll #-}
poll :: Async a -> IO (Maybe (Either SomeException a))
poll :: Async a -> IO (Maybe (Either SomeException a))
poll = STM (Maybe (Either SomeException a))
-> IO (Maybe (Either SomeException a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either SomeException a))
 -> IO (Maybe (Either SomeException a)))
-> (Async a -> STM (Maybe (Either SomeException a)))
-> Async a
-> IO (Maybe (Either SomeException a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (Maybe (Either SomeException a))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM a
waitSTM :: Async a -> STM a
waitSTM Async a
a = do
   Either SomeException a
r <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
   (SomeException -> STM a)
-> (a -> STM a) -> Either SomeException a -> STM a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM a
forall e a. Exception e => e -> STM a
throwSTM a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM (Async ThreadId
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w

-- | A version of 'poll' that can be used inside an STM transaction.
--
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM (Async ThreadId
_ STM (Either SomeException a)
w) = (Either SomeException a -> Maybe (Either SomeException a)
forall a. a -> Maybe a
Just (Either SomeException a -> Maybe (Either SomeException a))
-> STM (Either SomeException a)
-> STM (Maybe (Either SomeException a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either SomeException a)
w) STM (Maybe (Either SomeException a))
-> STM (Maybe (Either SomeException a))
-> STM (Maybe (Either SomeException a))
forall a. STM a -> STM a -> STM a
`orElse` Maybe (Either SomeException a)
-> STM (Maybe (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either SomeException a)
forall a. Maybe a
Nothing

-- | Cancel an asynchronous action by throwing the @AsyncCancelled@
-- exception to it, and waiting for the `Async` thread to quit.
-- Has no effect if the 'Async' has already completed.
--
-- > cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a
--
-- Note that 'cancel' will not terminate until the thread the 'Async'
-- refers to has terminated. This means that 'cancel' will block for
-- as long said thread blocks when receiving an asynchronous exception.
--
-- For example, it could block if:
--
-- * It's executing a foreign call, and thus cannot receive the asynchronous
-- exception;
-- * It's executing some cleanup handler after having received the exception,
-- and the handler is blocking.
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: Async a -> IO ()
cancel a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) = ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t AsyncCancelled
AsyncCancelled IO () -> IO (Either SomeException a) -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a

-- | The exception thrown by `cancel` to terminate a thread.
data AsyncCancelled = AsyncCancelled
  deriving (Int -> AsyncCancelled -> ShowS
[AsyncCancelled] -> ShowS
AsyncCancelled -> String
(Int -> AsyncCancelled -> ShowS)
-> (AsyncCancelled -> String)
-> ([AsyncCancelled] -> ShowS)
-> Show AsyncCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AsyncCancelled] -> ShowS
$cshowList :: [AsyncCancelled] -> ShowS
show :: AsyncCancelled -> String
$cshow :: AsyncCancelled -> String
showsPrec :: Int -> AsyncCancelled -> ShowS
$cshowsPrec :: Int -> AsyncCancelled -> ShowS
Show, AsyncCancelled -> AsyncCancelled -> Bool
(AsyncCancelled -> AsyncCancelled -> Bool)
-> (AsyncCancelled -> AsyncCancelled -> Bool) -> Eq AsyncCancelled
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: AsyncCancelled -> AsyncCancelled -> Bool
$c/= :: AsyncCancelled -> AsyncCancelled -> Bool
== :: AsyncCancelled -> AsyncCancelled -> Bool
$c== :: AsyncCancelled -> AsyncCancelled -> Bool
Eq
#if __GLASGOW_HASKELL__ < 710
    ,Typeable
#endif
    )

instance Exception AsyncCancelled where
#if __GLASGOW_HASKELL__ >= 708
  fromException :: SomeException -> Maybe AsyncCancelled
fromException = SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
  toException :: AsyncCancelled -> SomeException
toException = AsyncCancelled -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif

-- | Cancel an asynchronous action
--
-- This is a variant of `cancel`, but it is not interruptible.
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel = IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> (Async a -> IO ()) -> Async a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
cancel

-- | Cancel an asynchronous action by throwing the supplied exception
-- to it.
--
-- > cancelWith a x = throwTo (asyncThreadId a) x
--
-- The notes about the synchronous nature of 'cancel' also apply to
-- 'cancelWith'.
cancelWith :: Exception e => Async a -> e -> IO ()
cancelWith :: Async a -> e -> IO ()
cancelWith a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) e
e = ThreadId -> e -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t e
e IO () -> IO (Either SomeException a) -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a

-- | Wait for any of the supplied asynchronous operations to complete.
-- The value returned is a pair of the 'Async' that completed, and the
-- result that would be returned by 'wait' on that 'Async'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
{-# INLINE waitAnyCatch #-}
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch = STM (Async a, Either SomeException a)
-> IO (Async a, Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Async a, Either SomeException a)
 -> IO (Async a, Either SomeException a))
-> ([Async a] -> STM (Async a, Either SomeException a))
-> [Async a]
-> IO (Async a, Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> STM (Async a, Either SomeException a)
forall a. [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM

-- | A version of 'waitAnyCatch' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitAnyCatchSTM :: [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM :: [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM [Async a]
asyncs =
    (STM (Async a, Either SomeException a)
 -> STM (Async a, Either SomeException a)
 -> STM (Async a, Either SomeException a))
-> STM (Async a, Either SomeException a)
-> [STM (Async a, Either SomeException a)]
-> STM (Async a, Either SomeException a)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a)
forall a. STM a -> STM a -> STM a
orElse STM (Async a, Either SomeException a)
forall a. STM a
retry ([STM (Async a, Either SomeException a)]
 -> STM (Async a, Either SomeException a))
-> [STM (Async a, Either SomeException a)]
-> STM (Async a, Either SomeException a)
forall a b. (a -> b) -> a -> b
$
      (Async a -> STM (Async a, Either SomeException a))
-> [Async a] -> [STM (Async a, Either SomeException a)]
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do Either SomeException a
r <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a; (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, Either SomeException a
r)) [Async a]
asyncs

-- | Like 'waitAnyCatch', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async a]
asyncs =
  [Async a] -> IO (Async a, Either SomeException a)
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async a]
asyncs IO (Async a, Either SomeException a)
-> IO () -> IO (Async a, Either SomeException a)
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()) -> [Async a] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async a -> IO ()
forall a. Async a -> IO ()
cancel [Async a]
asyncs

-- | Wait for any of the supplied @Async@s to complete.  If the first
-- to complete throws an exception, then that exception is re-thrown
-- by 'waitAny'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
{-# INLINE waitAny #-}
waitAny :: [Async a] -> IO (Async a, a)
waitAny :: [Async a] -> IO (Async a, a)
waitAny = STM (Async a, a) -> IO (Async a, a)
forall a. STM a -> IO a
atomically (STM (Async a, a) -> IO (Async a, a))
-> ([Async a] -> STM (Async a, a)) -> [Async a] -> IO (Async a, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> STM (Async a, a)
forall a. [Async a] -> STM (Async a, a)
waitAnySTM

-- | A version of 'waitAny' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitAnySTM :: [Async a] -> STM (Async a, a)
waitAnySTM :: [Async a] -> STM (Async a, a)
waitAnySTM [Async a]
asyncs =
    (STM (Async a, a) -> STM (Async a, a) -> STM (Async a, a))
-> STM (Async a, a) -> [STM (Async a, a)] -> STM (Async a, a)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Async a, a) -> STM (Async a, a) -> STM (Async a, a)
forall a. STM a -> STM a -> STM a
orElse STM (Async a, a)
forall a. STM a
retry ([STM (Async a, a)] -> STM (Async a, a))
-> [STM (Async a, a)] -> STM (Async a, a)
forall a b. (a -> b) -> a -> b
$
      (Async a -> STM (Async a, a)) -> [Async a] -> [STM (Async a, a)]
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do a
r <- Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
a; (Async a, a) -> STM (Async a, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, a
r)) [Async a]
asyncs

-- | Like 'waitAny', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel [Async a]
asyncs =
  [Async a] -> IO (Async a, a)
forall a. [Async a] -> IO (Async a, a)
waitAny [Async a]
asyncs IO (Async a, a) -> IO () -> IO (Async a, a)
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()) -> [Async a] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async a -> IO ()
forall a. Async a -> IO ()
cancel [Async a]
asyncs

-- | Wait for the first of two @Async@s to finish.
{-# INLINE waitEitherCatch #-}
waitEitherCatch :: Async a -> Async b
                -> IO (Either (Either SomeException a)
                              (Either SomeException b))
waitEitherCatch :: Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right =
  IO (Either (Either SomeException a) (Either SomeException b))
-> IO (Either (Either SomeException a) (Either SomeException b))
forall b. IO b -> IO b
tryAgain (IO (Either (Either SomeException a) (Either SomeException b))
 -> IO (Either (Either SomeException a) (Either SomeException b)))
-> IO (Either (Either SomeException a) (Either SomeException b))
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b. (a -> b) -> a -> b
$ STM (Either (Either SomeException a) (Either SomeException b))
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a. STM a -> IO a
atomically (Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async a
left Async b
right)
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | A version of 'waitEitherCatch' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitEitherCatchSTM :: Async a -> Async b
                -> STM (Either (Either SomeException a)
                               (Either SomeException b))
waitEitherCatchSTM :: Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async a
left Async b
right =
    (Either SomeException a
-> Either (Either SomeException a) (Either SomeException b)
forall a b. a -> Either a b
Left  (Either SomeException a
 -> Either (Either SomeException a) (Either SomeException b))
-> STM (Either SomeException a)
-> STM (Either (Either SomeException a) (Either SomeException b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
left)
      STM (Either (Either SomeException a) (Either SomeException b))
-> STM (Either (Either SomeException a) (Either SomeException b))
-> STM (Either (Either SomeException a) (Either SomeException b))
forall a. STM a -> STM a -> STM a
`orElse`
    (Either SomeException b
-> Either (Either SomeException a) (Either SomeException b)
forall a b. b -> Either a b
Right (Either SomeException b
 -> Either (Either SomeException a) (Either SomeException b))
-> STM (Either SomeException b)
-> STM (Either (Either SomeException a) (Either SomeException b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async b -> STM (Either SomeException b)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async b
right)

-- | Like 'waitEitherCatch', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCatchCancel :: Async a -> Async b
                      -> IO (Either (Either SomeException a)
                                    (Either SomeException b))
waitEitherCatchCancel :: Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel Async a
left Async b
right =
  Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right IO (Either (Either SomeException a) (Either SomeException b))
-> IO ()
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()
forall a. Async a -> IO ()
cancel Async a
left IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async b -> IO ()
forall a. Async a -> IO ()
cancel Async b
right)

-- | Wait for the first of two @Async@s to finish.  If the @Async@
-- that finished first raised an exception, then the exception is
-- re-thrown by 'waitEither'.
--
{-# INLINE waitEither #-}
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right = STM (Either a b) -> IO (Either a b)
forall a. STM a -> IO a
atomically (Async a -> Async b -> STM (Either a b)
forall a b. Async a -> Async b -> STM (Either a b)
waitEitherSTM Async a
left Async b
right)

-- | A version of 'waitEither' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitEitherSTM :: Async a -> Async b -> STM (Either a b)
waitEitherSTM :: Async a -> Async b -> STM (Either a b)
waitEitherSTM Async a
left Async b
right =
    (a -> Either a b
forall a b. a -> Either a b
Left  (a -> Either a b) -> STM a -> STM (Either a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
left)
      STM (Either a b) -> STM (Either a b) -> STM (Either a b)
forall a. STM a -> STM a -> STM a
`orElse`
    (b -> Either a b
forall a b. b -> Either a b
Right (b -> Either a b) -> STM b -> STM (Either a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right)

-- | Like 'waitEither', but the result is ignored.
--
{-# INLINE waitEither_ #-}
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ Async a
left Async b
right = STM () -> IO ()
forall a. STM a -> IO a
atomically (Async a -> Async b -> STM ()
forall a b. Async a -> Async b -> STM ()
waitEitherSTM_ Async a
left Async b
right)

-- | A version of 'waitEither_' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitEitherSTM_:: Async a -> Async b -> STM ()
waitEitherSTM_ :: Async a -> Async b -> STM ()
waitEitherSTM_ Async a
left Async b
right =
    (STM a -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM a -> STM ()) -> STM a -> STM ()
forall a b. (a -> b) -> a -> b
$ Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
left)
      STM () -> STM () -> STM ()
forall a. STM a -> STM a -> STM a
`orElse`
    (STM b -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM b -> STM ()) -> STM b -> STM ()
forall a b. (a -> b) -> a -> b
$ Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right)

-- | Like 'waitEither', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel Async a
left Async b
right =
  Async a -> Async b -> IO (Either a b)
forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right IO (Either a b) -> IO () -> IO (Either a b)
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()
forall a. Async a -> IO ()
cancel Async a
left IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async b -> IO ()
forall a. Async a -> IO ()
cancel Async b
right)

-- | Waits for both @Async@s to finish, but if either of them throws
-- an exception before they have both finished, then the exception is
-- re-thrown by 'waitBoth'.
--
{-# INLINE waitBoth #-}
waitBoth :: Async a -> Async b -> IO (a,b)
waitBoth :: Async a -> Async b -> IO (a, b)
waitBoth Async a
left Async b
right = IO (a, b) -> IO (a, b)
forall b. IO b -> IO b
tryAgain (IO (a, b) -> IO (a, b)) -> IO (a, b) -> IO (a, b)
forall a b. (a -> b) -> a -> b
$ STM (a, b) -> IO (a, b)
forall a. STM a -> IO a
atomically (Async a -> Async b -> STM (a, b)
forall a b. Async a -> Async b -> STM (a, b)
waitBothSTM Async a
left Async b
right)
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | A version of 'waitBoth' that can be used inside an STM transaction.
--
-- @since 2.1.0
waitBothSTM :: Async a -> Async b -> STM (a,b)
waitBothSTM :: Async a -> Async b -> STM (a, b)
waitBothSTM Async a
left Async b
right = do
    a
a <- Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
left
           STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
`orElse`
         (Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right STM b -> STM a -> STM a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM a
forall a. STM a
retry)
    b
b <- Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right
    (a, b) -> STM (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)


-- -----------------------------------------------------------------------------
-- Linking threads

data ExceptionInLinkedThread =
  forall a . ExceptionInLinkedThread (Async a) SomeException
#if __GLASGOW_HASKELL__ < 710
  deriving Typeable
#endif

instance Show ExceptionInLinkedThread where
  showsPrec :: Int -> ExceptionInLinkedThread -> ShowS
showsPrec Int
p (ExceptionInLinkedThread (Async ThreadId
t STM (Either SomeException a)
_) SomeException
e) =
    Bool -> ShowS -> ShowS
showParen (Int
p Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
11) (ShowS -> ShowS) -> ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$
      String -> ShowS
showString String
"ExceptionInLinkedThread " ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
      Int -> ThreadId -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
11 ThreadId
t ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
      String -> ShowS
showString String
" " ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
      Int -> SomeException -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
11 SomeException
e

instance Exception ExceptionInLinkedThread where
#if __GLASGOW_HASKELL__ >= 708
  fromException :: SomeException -> Maybe ExceptionInLinkedThread
fromException = SomeException -> Maybe ExceptionInLinkedThread
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
  toException :: ExceptionInLinkedThread -> SomeException
toException = ExceptionInLinkedThread -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif

-- | Link the given @Async@ to the current thread, such that if the
-- @Async@ raises an exception, that exception will be re-thrown in
-- the current thread, wrapped in 'ExceptionInLinkedThread'.
--
-- 'link' ignores 'AsyncCancelled' exceptions thrown in the other thread,
-- so that it's safe to 'cancel' a thread you're linked to.  If you want
-- different behaviour, use 'linkOnly'.
--
link :: Async a -> IO ()
link :: Async a -> IO ()
link = (SomeException -> Bool) -> Async a -> IO ()
forall a. (SomeException -> Bool) -> Async a -> IO ()
linkOnly (Bool -> Bool
not (Bool -> Bool) -> (SomeException -> Bool) -> SomeException -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Bool
isCancel)

-- | Link the given @Async@ to the current thread, such that if the
-- @Async@ raises an exception, that exception will be re-thrown in
-- the current thread, wrapped in 'ExceptionInLinkedThread'.
--
-- The supplied predicate determines which exceptions in the target
-- thread should be propagated to the source thread.
--
linkOnly
  :: (SomeException -> Bool)  -- ^ return 'True' if the exception
                              -- should be propagated, 'False'
                              -- otherwise.
  -> Async a
  -> IO ()
linkOnly :: (SomeException -> Bool) -> Async a -> IO ()
linkOnly SomeException -> Bool
shouldThrow Async a
a = do
  ThreadId
me <- IO ThreadId
myThreadId
  IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forall a. IO a -> IO ThreadId
forkRepeat (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Either SomeException a
r <- Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a
    case Either SomeException a
r of
      Left SomeException
e | SomeException -> Bool
shouldThrow SomeException
e -> ThreadId -> ExceptionInLinkedThread -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
me (Async a -> SomeException -> ExceptionInLinkedThread
forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async a
a SomeException
e)
      Either SomeException a
_otherwise -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Link two @Async@s together, such that if either raises an
-- exception, the same exception is re-thrown in the other @Async@,
-- wrapped in 'ExceptionInLinkedThread'.
--
-- 'link2' ignores 'AsyncCancelled' exceptions, so that it's possible
-- to 'cancel' either thread without cancelling the other.  If you
-- want different behaviour, use 'link2Only'.
--
link2 :: Async a -> Async b -> IO ()
link2 :: Async a -> Async b -> IO ()
link2 = (SomeException -> Bool) -> Async a -> Async b -> IO ()
forall a b. (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only (Bool -> Bool
not (Bool -> Bool) -> (SomeException -> Bool) -> SomeException -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Bool
isCancel)

-- | Link two @Async@s together, such that if either raises an
-- exception, the same exception is re-thrown in the other @Async@,
-- wrapped in 'ExceptionInLinkedThread'.
--
-- The supplied predicate determines which exceptions in the target
-- thread should be propagated to the source thread.
--
link2Only :: (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only :: (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only SomeException -> Bool
shouldThrow left :: Async a
left@(Async ThreadId
tl STM (Either SomeException a)
_)  right :: Async b
right@(Async ThreadId
tr STM (Either SomeException b)
_) =
  IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forall a. IO a -> IO ThreadId
forkRepeat (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Either (Either SomeException a) (Either SomeException b)
r <- Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right
    case Either (Either SomeException a) (Either SomeException b)
r of
      Left  (Left SomeException
e) | SomeException -> Bool
shouldThrow SomeException
e ->
        ThreadId -> ExceptionInLinkedThread -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tr (Async a -> SomeException -> ExceptionInLinkedThread
forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async a
left SomeException
e)
      Right (Left SomeException
e) | SomeException -> Bool
shouldThrow SomeException
e ->
        ThreadId -> ExceptionInLinkedThread -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tl (Async b -> SomeException -> ExceptionInLinkedThread
forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async b
right SomeException
e)
      Either (Either SomeException a) (Either SomeException b)
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

isCancel :: SomeException -> Bool
isCancel :: SomeException -> Bool
isCancel SomeException
e
  | Just AsyncCancelled
AsyncCancelled <- SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = Bool
True
  | Bool
otherwise = Bool
False


-- -----------------------------------------------------------------------------

-- | Run two @IO@ actions concurrently, and return the first to
-- finish.  The loser of the race is 'cancel'led.
--
-- > race left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitEither a b
--
race :: IO a -> IO b -> IO (Either a b)

-- | Like 'race', but the result is ignored.
--
race_ :: IO a -> IO b -> IO ()

-- | Run two @IO@ actions concurrently, and return both results.  If
-- either action throws an exception at any time, then the other
-- action is 'cancel'led, and the exception is re-thrown by
-- 'concurrently'.
--
-- > concurrently left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)

-- | 'concurrently', but ignore the result values
--
-- @since 2.1.1
concurrently_ :: IO a -> IO b -> IO ()

#define USE_ASYNC_VERSIONS 0

#if USE_ASYNC_VERSIONS

race left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitEither a b

race_ left right = void $ race left right

concurrently left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitBoth a b

concurrently_ left right = void $ concurrently left right

#else

-- MVar versions of race/concurrently
-- More ugly than the Async versions, but quite a bit faster.

-- race :: IO a -> IO b -> IO (Either a b)
race :: IO a -> IO b -> IO (Either a b)
race IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO (Either a b))
-> IO (Either a b)
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO (Either a b)
forall e b. Exception e => IO (Either e b) -> IO b
collect
  where
    collect :: IO (Either e b) -> IO b
collect IO (Either e b)
m = do
        Either e b
e <- IO (Either e b)
m
        case Either e b
e of
            Left e
ex -> e -> IO b
forall e a. Exception e => e -> IO a
throwIO e
ex
            Right b
r -> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- race_ :: IO a -> IO b -> IO ()
race_ :: IO a -> IO b -> IO ()
race_ IO a
left IO b
right = IO (Either a b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either a b) -> IO ()) -> IO (Either a b) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a -> IO b -> IO (Either a b)
forall a b. IO a -> IO b -> IO (Either a b)
race IO a
left IO b
right

-- concurrently :: IO a -> IO b -> IO (a,b)
concurrently :: IO a -> IO b -> IO (a, b)
concurrently IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO (a, b))
-> IO (a, b)
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right ([Either a b] -> IO (Either SomeException (Either a b)) -> IO (a, b)
forall e a b.
Exception e =>
[Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [])
  where
    collect :: [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [Left a
a, Right b
b] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Right b
b, Left a
a] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Either a b]
xs IO (Either e (Either a b))
m = do
        Either e (Either a b)
e <- IO (Either e (Either a b))
m
        case Either e (Either a b)
e of
            Left e
ex -> e -> IO (a, b)
forall e a. Exception e => e -> IO a
throwIO e
ex
            Right Either a b
r -> [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect (Either a b
rEither a b -> [Either a b] -> [Either a b]
forall a. a -> [a] -> [a]
:[Either a b]
xs) IO (Either e (Either a b))
m

concurrently' :: IO a -> IO b
             -> (IO (Either SomeException (Either a b)) -> IO r)
             -> IO r
concurrently' :: IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO r
collect = do
    MVar (Either SomeException (Either a b))
done <- IO (MVar (Either SomeException (Either a b)))
forall a. IO (MVar a)
newEmptyMVar
    ((forall b. IO b -> IO b) -> IO r) -> IO r
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO r) -> IO r)
-> ((forall b. IO b -> IO b) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
        -- Note: uninterruptibleMask here is because we must not allow
        -- the putMVar in the exception handler to be interrupted,
        -- otherwise the parent thread will deadlock when it waits for
        -- the thread to terminate.
        ThreadId
lid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
          IO () -> IO ()
forall b. IO b -> IO b
restore (IO a
left IO a -> (a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (a -> Either SomeException (Either a b)) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (a -> Either a b) -> a -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either a b
forall a b. a -> Either a b
Left)
            IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
        ThreadId
rid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
          IO () -> IO ()
forall b. IO b -> IO b
restore (IO b
right IO b -> (b -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (b -> Either SomeException (Either a b)) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (b -> Either a b) -> b -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Either a b
forall a b. b -> Either a b
Right)
            IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)

        IORef Int
count <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
2 :: Int)
        let takeDone :: IO (Either SomeException (Either a b))
takeDone = do
                Either SomeException (Either a b)
r <- MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done      -- interruptible
                -- Decrement the counter so we know how many takes are left.
                -- Since only the parent thread is calling this, we can
                -- use non-atomic modifications.
                -- NB. do this *after* takeMVar, because takeMVar might be
                -- interrupted.
                IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef Int
count (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
                Either SomeException (Either a b)
-> IO (Either SomeException (Either a b))
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException (Either a b)
r

        let tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnMVar -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> IO a
f

            stop :: IO ()
stop = do
                -- kill right before left, to match the semantics of
                -- the version using withAsync. (#27)
                IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  Int
count' <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
count
                  -- we only need to use killThread if there are still
                  -- children alive.  Note: forkIO here is because the
                  -- child thread could be in an uninterruptible
                  -- putMVar.
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
count' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
                      ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
rid AsyncCancelled
AsyncCancelled
                      ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
lid AsyncCancelled
AsyncCancelled
                  -- ensure the children are really dead
                  Int -> IO (Either SomeException (Either a b)) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
count' (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall b. IO b -> IO b
tryAgain (IO (Either SomeException (Either a b))
 -> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done)

        r
r <- IO (Either SomeException (Either a b)) -> IO r
collect (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall b. IO b -> IO b
tryAgain (IO (Either SomeException (Either a b))
 -> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException (Either a b))
takeDone) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`onException` IO ()
stop
        IO ()
stop
        r -> IO r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r

concurrently_ :: IO a -> IO b -> IO ()
concurrently_ IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO ())
-> IO ()
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right (Int -> IO (Either SomeException (Either a b)) -> IO ()
forall e b. Exception e => Int -> IO (Either e b) -> IO ()
collect Int
0)
  where
    collect :: Int -> IO (Either e b) -> IO ()
collect Int
2 IO (Either e b)
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    collect Int
i IO (Either e b)
m = do
        Either e b
e <- IO (Either e b)
m
        case Either e b
e of
            Left e
ex -> e -> IO ()
forall e a. Exception e => e -> IO a
throwIO e
ex
            Right b
_ -> Int -> IO (Either e b) -> IO ()
collect (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 :: Int) IO (Either e b)
m


#endif

-- | Maps an 'IO'-performing function over any 'Traversable' data
-- type, performing all the @IO@ actions concurrently, and returning
-- the original data structure with the arguments replaced by the
-- results.
--
-- If any of the actions throw an exception, then all other actions are
-- cancelled and the exception is re-thrown.
--
-- For example, @mapConcurrently@ works with lists:
--
-- > pages <- mapConcurrently getURL ["url1", "url2", "url3"]
--
-- Take into account that @async@ will try to immediately spawn a thread
-- for each element of the @Traversable@, so running this on large
-- inputs without care may lead to resource exhaustion (of memory,
-- file descriptors, or other limited resources).
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently :: (a -> IO b) -> t a -> IO (t b)
mapConcurrently a -> IO b
f = Concurrently (t b) -> IO (t b)
forall a. Concurrently a -> IO a
runConcurrently (Concurrently (t b) -> IO (t b))
-> (t a -> Concurrently (t b)) -> t a -> IO (t b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Concurrently b) -> t a -> Concurrently (t b)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (IO b -> Concurrently b
forall a. IO a -> Concurrently a
Concurrently (IO b -> Concurrently b) -> (a -> IO b) -> a -> Concurrently b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f)

-- | `forConcurrently` is `mapConcurrently` with its arguments flipped
--
-- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
--
-- @since 2.1.0
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently :: t a -> (a -> IO b) -> IO (t b)
forConcurrently = ((a -> IO b) -> t a -> IO (t b)) -> t a -> (a -> IO b) -> IO (t b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> IO b) -> t a -> IO (t b)
forall (t :: * -> *) a b.
Traversable t =>
(a -> IO b) -> t a -> IO (t b)
mapConcurrently

-- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
-- a concurrent equivalent of 'mapM_'.
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ :: (a -> IO b) -> f a -> IO ()
mapConcurrently_ a -> IO b
f = Concurrently () -> IO ()
forall a. Concurrently a -> IO a
runConcurrently (Concurrently () -> IO ())
-> (f a -> Concurrently ()) -> f a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Concurrently ()) -> f a -> Concurrently ()
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
F.foldMap (IO () -> Concurrently ()
forall a. IO a -> Concurrently a
Concurrently (IO () -> Concurrently ()) -> (a -> IO ()) -> a -> Concurrently ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f)

-- | `forConcurrently_` is `forConcurrently` with the return value discarded;
-- a concurrent equivalent of 'forM_'.
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ :: f a -> (a -> IO b) -> IO ()
forConcurrently_ = ((a -> IO b) -> f a -> IO ()) -> f a -> (a -> IO b) -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> IO b) -> f a -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_

-- | Perform the action in the given number of threads.
--
-- @since 2.1.1
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently Int
cnt = Concurrently [a] -> IO [a]
forall a. Concurrently a -> IO a
runConcurrently (Concurrently [a] -> IO [a])
-> (IO a -> Concurrently [a]) -> IO a -> IO [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Concurrently a] -> Concurrently [a]
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA ([Concurrently a] -> Concurrently [a])
-> (IO a -> [Concurrently a]) -> IO a -> Concurrently [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Concurrently a -> [Concurrently a]
forall a. Int -> a -> [a]
replicate Int
cnt (Concurrently a -> [Concurrently a])
-> (IO a -> Concurrently a) -> IO a -> [Concurrently a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently

-- | Same as 'replicateConcurrently', but ignore the results.
--
-- @since 2.1.1
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ Int
cnt = Concurrently () -> IO ()
forall a. Concurrently a -> IO a
runConcurrently (Concurrently () -> IO ())
-> (IO a -> Concurrently ()) -> IO a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Concurrently ()] -> Concurrently ()
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold ([Concurrently ()] -> Concurrently ())
-> (IO a -> [Concurrently ()]) -> IO a -> Concurrently ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Concurrently () -> [Concurrently ()]
forall a. Int -> a -> [a]
replicate Int
cnt (Concurrently () -> [Concurrently ()])
-> (IO a -> Concurrently ()) -> IO a -> [Concurrently ()]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Concurrently ()
forall a. IO a -> Concurrently a
Concurrently (IO () -> Concurrently ())
-> (IO a -> IO ()) -> IO a -> Concurrently ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void

-- -----------------------------------------------------------------------------

-- | A value of type @Concurrently a@ is an @IO@ operation that can be
-- composed with other @Concurrently@ values, using the @Applicative@
-- and @Alternative@ instances.
--
-- Calling @runConcurrently@ on a value of type @Concurrently a@ will
-- execute the @IO@ operations it contains concurrently, before
-- delivering the result of type @a@.
--
-- For example
--
-- > (page1, page2, page3)
-- >     <- runConcurrently $ (,,)
-- >     <$> Concurrently (getURL "url1")
-- >     <*> Concurrently (getURL "url2")
-- >     <*> Concurrently (getURL "url3")
--
newtype Concurrently a = Concurrently { Concurrently a -> IO a
runConcurrently :: IO a }

instance Functor Concurrently where
  fmap :: (a -> b) -> Concurrently a -> Concurrently b
fmap a -> b
f (Concurrently IO a
a) = IO b -> Concurrently b
forall a. IO a -> Concurrently a
Concurrently (IO b -> Concurrently b) -> IO b -> Concurrently b
forall a b. (a -> b) -> a -> b
$ a -> b
f (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
a

instance Applicative Concurrently where
  pure :: a -> Concurrently a
pure = IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently (IO a -> Concurrently a) -> (a -> IO a) -> a -> Concurrently a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
  Concurrently IO (a -> b)
fs <*> :: Concurrently (a -> b) -> Concurrently a -> Concurrently b
<*> Concurrently IO a
as =
    IO b -> Concurrently b
forall a. IO a -> Concurrently a
Concurrently (IO b -> Concurrently b) -> IO b -> Concurrently b
forall a b. (a -> b) -> a -> b
$ (\(a -> b
f, a
a) -> a -> b
f a
a) ((a -> b, a) -> b) -> IO (a -> b, a) -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (a -> b) -> IO a -> IO (a -> b, a)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO (a -> b)
fs IO a
as

instance Alternative Concurrently where
  empty :: Concurrently a
empty = IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently (IO a -> Concurrently a) -> IO a -> Concurrently a
forall a b. (a -> b) -> a -> b
$ IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound)
  Concurrently IO a
as <|> :: Concurrently a -> Concurrently a -> Concurrently a
<|> Concurrently IO a
bs =
    IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently (IO a -> Concurrently a) -> IO a -> Concurrently a
forall a b. (a -> b) -> a -> b
$ (a -> a) -> (a -> a) -> Either a a -> a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> a
forall a. a -> a
id a -> a
forall a. a -> a
id (Either a a -> a) -> IO (Either a a) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a -> IO a -> IO (Either a a)
forall a b. IO a -> IO b -> IO (Either a b)
race IO a
as IO a
bs

#if MIN_VERSION_base(4,9,0)
-- | Only defined by @async@ for @base >= 4.9@
--
-- @since 2.1.0
instance Semigroup a => Semigroup (Concurrently a) where
  <> :: Concurrently a -> Concurrently a -> Concurrently a
(<>) = (a -> a -> a) -> Concurrently a -> Concurrently a -> Concurrently a
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>)

-- | @since 2.1.0
instance (Semigroup a, Monoid a) => Monoid (Concurrently a) where
  mempty :: Concurrently a
mempty = a -> Concurrently a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
mempty
  mappend :: Concurrently a -> Concurrently a -> Concurrently a
mappend = Concurrently a -> Concurrently a -> Concurrently a
forall a. Semigroup a => a -> a -> a
(<>)
#else
-- | @since 2.1.0
instance Monoid a => Monoid (Concurrently a) where
  mempty = pure mempty
  mappend = liftA2 mappend
#endif

-- ----------------------------------------------------------------------------

-- | Fork a thread that runs the supplied action, and if it raises an
-- exception, re-runs the action.  The thread terminates only when the
-- action runs to completion without raising an exception.
forkRepeat :: IO a -> IO ThreadId
forkRepeat :: IO a -> IO ThreadId
forkRepeat IO a
action =
  ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
    let go :: IO ()
go = do Either SomeException a
r <- IO a -> IO (Either SomeException a)
forall a. IO a -> IO (Either SomeException a)
tryAll (IO a -> IO a
forall b. IO b -> IO b
restore IO a
action)
                case Either SomeException a
r of
                  Left SomeException
_ -> IO ()
go
                  Either SomeException a
_      -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    in IO () -> IO ThreadId
forkIO IO ()
go

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch

tryAll :: IO a -> IO (Either SomeException a)
tryAll :: IO a -> IO (Either SomeException a)
tryAll = IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try

-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO :: IO () -> IO ThreadId
rawForkIO (IO State# RealWorld -> (# State# RealWorld, () #)
action) = (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, ThreadId #))
 -> IO ThreadId)
-> (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
   case ((State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# State# RealWorld -> (# State# RealWorld, () #)
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)

{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# Int#
cpu) (IO State# RealWorld -> (# State# RealWorld, () #)
action) = (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, ThreadId #))
 -> IO ThreadId)
-> (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
   case (Int#
-> (State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld
-> (# State# RealWorld, ThreadId# #)
forall a.
Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forkOn# Int#
cpu State# RealWorld -> (# State# RealWorld, () #)
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)