module Control.Concurrent.Async.Lifted
(
A.Async
, async, asyncBound, asyncOn
, asyncWithUnmask, asyncOnWithUnmask
, withAsync, withAsyncBound, withAsyncOn
, withAsyncWithUnmask, withAsyncOnWithUnmask
, wait, poll, waitCatch, cancel, cancelWith
, A.asyncThreadId
, A.waitSTM, A.pollSTM, A.waitCatchSTM
, waitAny, waitAnyCatch, waitAnyCancel, waitAnyCatchCancel
, waitEither, waitEitherCatch, waitEitherCancel, waitEitherCatchCancel
, waitEither_
, waitBoth
#if MIN_VERSION_async(2, 1, 0)
, A.waitAnySTM
, A.waitAnyCatchSTM
, A.waitEitherSTM
, A.waitEitherCatchSTM
, A.waitEitherSTM_
, A.waitBothSTM
#endif
, link, link2
, race, race_, concurrently, mapConcurrently, forConcurrently
, Concurrently(..)
) where
import Control.Applicative
import Control.Concurrent (threadDelay)
import Control.Monad ((>=>), forever, liftM)
import GHC.IO (unsafeUnmask)
import Prelude hiding (mapM)
import Control.Concurrent.Async (Async)
import Control.Exception.Lifted (SomeException, Exception)
import Control.Monad.Base (MonadBase(..))
import Control.Monad.Trans.Control
import qualified Control.Concurrent.Async as A
import qualified Control.Exception.Lifted as E
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Data.Traversable
#endif
#if !MIN_VERSION_base(4, 8, 0)
import Data.Monoid (Monoid(mappend, mempty))
#elif MIN_VERSION_base(4, 9, 0)
import Data.Semigroup (Semigroup((<>)))
#endif
async :: MonadBaseControl IO m => m a -> m (Async (StM m a))
async = asyncUsing A.async
asyncBound :: MonadBaseControl IO m => m a -> m (Async (StM m a))
asyncBound = asyncUsing A.asyncBound
asyncOn :: MonadBaseControl IO m => Int -> m a -> m (Async (StM m a))
asyncOn cpu = asyncUsing (A.asyncOn cpu)
asyncWithUnmask
:: MonadBaseControl IO m
=> ((forall b. m b -> m b) -> m a)
-> m (Async (StM m a))
asyncWithUnmask actionWith =
asyncUsing A.async (actionWith (liftBaseOp_ unsafeUnmask))
asyncOnWithUnmask
:: MonadBaseControl IO m
=> Int
-> ((forall b. m b -> m b) -> m a)
-> m (Async (StM m a))
asyncOnWithUnmask cpu actionWith =
asyncUsing (A.asyncOn cpu) (actionWith (liftBaseOp_ unsafeUnmask))
asyncUsing
:: MonadBaseControl IO m
=> (IO (StM m a) -> IO (Async (StM m a)))
-> m a
-> m (Async (StM m a))
asyncUsing fork m =
liftBaseWith $ \runInIO -> fork (runInIO m)
withAsync
:: MonadBaseControl IO m
=> m a
-> (Async (StM m a) -> m b)
-> m b
withAsync = withAsyncUsing async
withAsyncBound
:: MonadBaseControl IO m
=> m a
-> (Async (StM m a) -> m b)
-> m b
withAsyncBound = withAsyncUsing asyncBound
withAsyncOn
:: MonadBaseControl IO m
=> Int
-> m a
-> (Async (StM m a) -> m b)
-> m b
withAsyncOn = withAsyncUsing . asyncOn
withAsyncWithUnmask
:: MonadBaseControl IO m
=> ((forall c. m c -> m c) -> m a)
-> (Async (StM m a) -> m b)
-> m b
withAsyncWithUnmask actionWith =
withAsyncUsing async (actionWith (liftBaseOp_ unsafeUnmask))
withAsyncOnWithUnmask
:: MonadBaseControl IO m
=> Int
-> ((forall c. m c -> m c) -> m a)
-> (Async (StM m a) -> m b)
-> m b
withAsyncOnWithUnmask cpu actionWith =
withAsyncUsing (asyncOn cpu) (actionWith (liftBaseOp_ unsafeUnmask))
withAsyncUsing
:: MonadBaseControl IO m
=> (m a -> m (Async (StM m a)))
-> m a
-> (Async (StM m a) -> m b)
-> m b
withAsyncUsing fork action inner = E.mask $ \restore -> do
a <- fork $ restore action
r <- restore (inner a) `E.catch` \e -> do
cancel a
E.throwIO (e :: SomeException)
cancel a
return r
wait :: MonadBaseControl IO m => Async (StM m a) -> m a
wait = liftBase . A.wait >=> restoreM
poll
:: MonadBaseControl IO m
=> Async (StM m a)
-> m (Maybe (Either SomeException a))
poll a =
liftBase (A.poll a) >>=
maybe (return Nothing) (liftM Just . sequenceEither)
cancel :: MonadBase IO m => Async a -> m ()
cancel = liftBase . A.cancel
cancelWith :: (MonadBase IO m, Exception e) => Async a -> e -> m ()
cancelWith = (liftBase .) . A.cancelWith
waitCatch
:: MonadBaseControl IO m
=> Async (StM m a)
-> m (Either SomeException a)
waitCatch a = liftBase (A.waitCatch a) >>= sequenceEither
waitAny :: MonadBaseControl IO m => [Async (StM m a)] -> m (Async (StM m a), a)
waitAny as = do
(a, s) <- liftBase $ A.waitAny as
r <- restoreM s
return (a, r)
waitAnyCatch
:: MonadBaseControl IO m
=> [Async (StM m a)]
-> m (Async (StM m a), Either SomeException a)
waitAnyCatch as = do
(a, s) <- liftBase $ A.waitAnyCatch as
r <- sequenceEither s
return (a, r)
waitAnyCancel
:: MonadBaseControl IO m
=> [Async (StM m a)]
-> m (Async (StM m a), a)
waitAnyCancel as = do
(a, s) <- liftBase $ A.waitAnyCancel as
r <- restoreM s
return (a, r)
waitAnyCatchCancel
:: MonadBaseControl IO m
=> [Async (StM m a)]
-> m (Async (StM m a), Either SomeException a)
waitAnyCatchCancel as = do
(a, s) <- liftBase $ A.waitAnyCatchCancel as
r <- sequenceEither s
return (a, r)
waitEither
:: MonadBaseControl IO m
=> Async (StM m a)
-> Async (StM m b)
-> m (Either a b)
waitEither a b =
liftBase (A.waitEither a b) >>=
either (liftM Left . restoreM) (liftM Right . restoreM)
waitEitherCatch
:: MonadBaseControl IO m
=> Async (StM m a)
-> Async (StM m b)
-> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch a b =
liftBase (A.waitEitherCatch a b) >>=
either (liftM Left . sequenceEither) (liftM Right . sequenceEither)
waitEitherCancel
:: MonadBaseControl IO m
=> Async (StM m a)
-> Async (StM m b)
-> m (Either a b)
waitEitherCancel a b =
liftBase (A.waitEitherCancel a b) >>=
either (liftM Left . restoreM) (liftM Right . restoreM)
waitEitherCatchCancel
:: MonadBaseControl IO m
=> Async (StM m a)
-> Async (StM m b)
-> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel a b =
liftBase (A.waitEitherCatch a b) >>=
either (liftM Left . sequenceEither) (liftM Right . sequenceEither)
waitEither_
:: MonadBase IO m
=> Async a
-> Async b
-> m ()
waitEither_ a b = liftBase (A.waitEither_ a b)
waitBoth
:: MonadBaseControl IO m
=> Async (StM m a)
-> Async (StM m b)
-> m (a, b)
waitBoth a b = do
(sa, sb) <- liftBase (A.waitBoth a b)
ra <- restoreM sa
rb <- restoreM sb
return (ra, rb)
link :: MonadBase IO m => Async a -> m ()
link = liftBase . A.link
link2 :: MonadBase IO m => Async a -> Async b -> m ()
link2 = (liftBase .) . A.link2
race :: MonadBaseControl IO m => m a -> m b -> m (Either a b)
race left right =
withAsync left $ \a ->
withAsync right $ \b ->
waitEither a b
race_ :: MonadBaseControl IO m => m a -> m b -> m ()
race_ left right =
withAsync left $ \a ->
withAsync right $ \b ->
waitEither_ a b
concurrently :: MonadBaseControl IO m => m a -> m b -> m (a, b)
concurrently left right =
withAsync left $ \a ->
withAsync right $ \b ->
waitBoth a b
mapConcurrently
:: (Traversable t, MonadBaseControl IO m)
=> (a -> m b)
-> t a
-> m (t b)
mapConcurrently f = runConcurrently . traverse (Concurrently . f)
forConcurrently
:: (Traversable t, MonadBaseControl IO m)
=> t a
-> (a -> m b)
-> m (t b)
forConcurrently = flip mapConcurrently
newtype Concurrently m a = Concurrently { runConcurrently :: m a }
instance Functor m => Functor (Concurrently m) where
fmap f (Concurrently a) = Concurrently $ f <$> a
instance MonadBaseControl IO m => Applicative (Concurrently m) where
pure = Concurrently . pure
Concurrently fs <*> Concurrently as =
Concurrently $ uncurry ($) <$> concurrently fs as
instance MonadBaseControl IO m => Alternative (Concurrently m) where
empty = Concurrently $ liftBaseWith $ const (forever $ threadDelay maxBound)
Concurrently as <|> Concurrently bs =
Concurrently $ either id id <$> race as bs
#if MIN_VERSION_base(4, 9, 0)
instance (MonadBaseControl IO m, Semigroup a) =>
Semigroup (Concurrently m a) where
(<>) = liftA2 (<>)
instance (MonadBaseControl IO m, Semigroup a, Monoid a) =>
Monoid (Concurrently m a) where
mempty = pure mempty
mappend = (<>)
#else
instance (MonadBaseControl IO m, Monoid a) => Monoid (Concurrently m a) where
mempty = pure mempty
mappend = liftA2 mappend
#endif
sequenceEither :: MonadBaseControl IO m => Either e (StM m a) -> m (Either e a)
sequenceEither = either (return . Left) (liftM Right . restoreM)