{-# LANGUAGE ScopedTypeVariables #-}
module Control.Concurrent.ParallelIO.Local (
parallel_, parallelE_, parallel, parallelE,
parallelInterleaved, parallelInterleavedE,
parallelFirst, parallelFirstE,
Pool, withPool, startPool, stopPool,
extraWorkerWhileBlocked,
spawnPoolWorkerFor, killPoolWorkerFor
) where
import Control.Concurrent.ParallelIO.Compat
import Control.Concurrent
import Control.Exception
import qualified Control.Exception as E
import Control.Monad
import System.IO
catchNonThreadKilled :: IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled :: IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled IO a
act SomeException -> IO a
handler = IO a
act IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \SomeException
e -> case SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of Just AsyncException
ThreadKilled -> SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO SomeException
e; Maybe AsyncException
_ -> SomeException -> IO a
handler SomeException
e
onNonThreadKilledException :: IO a -> IO b -> IO a
onNonThreadKilledException :: IO a -> IO b -> IO a
onNonThreadKilledException IO a
act IO b
handler = IO a -> (SomeException -> IO a) -> IO a
forall a. IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled IO a
act (\SomeException
e -> IO b
handler IO b -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO SomeException
e)
reflectExceptionsTo :: ThreadId -> IO () -> IO ()
reflectExceptionsTo :: ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
tid IO ()
act = IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
catchNonThreadKilled IO ()
act (ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid)
data Pool = Pool {
Pool -> Int
pool_threadcount :: Int,
Pool -> QSem
pool_sem :: QSem
}
startPool :: Int -> IO Pool
startPool :: Int -> IO Pool
startPool Int
threadcount
| Int
threadcount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = [Char] -> IO Pool
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO Pool) -> [Char] -> IO Pool
forall a b. (a -> b) -> a -> b
$ [Char]
"startPool: thread count must be strictly positive (was " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
threadcount [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
")"
| Bool
otherwise = (QSem -> Pool) -> IO QSem -> IO Pool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> QSem -> Pool
Pool Int
threadcount) (IO QSem -> IO Pool) -> IO QSem -> IO Pool
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem (Int
threadcount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
stopPool :: Pool -> IO ()
stopPool :: Pool -> IO ()
stopPool Pool
pool = Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (Pool -> Int
pool_threadcount Pool
pool Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Pool -> IO ()
killPoolWorkerFor Pool
pool
withPool :: Int -> (Pool -> IO a) -> IO a
withPool :: Int -> (Pool -> IO a) -> IO a
withPool Int
threadcount = IO Pool -> (Pool -> IO ()) -> (Pool -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Int -> IO Pool
startPool Int
threadcount) Pool -> IO ()
stopPool
extraWorkerWhileBlocked :: Pool -> IO a -> IO a
Pool
pool = IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (Pool -> IO ()
killPoolWorkerFor Pool
pool)
spawnPoolWorkerFor :: Pool -> IO ()
spawnPoolWorkerFor :: Pool -> IO ()
spawnPoolWorkerFor Pool
pool = QSem -> IO ()
signalQSem (Pool -> QSem
pool_sem Pool
pool)
killPoolWorkerFor :: Pool -> IO ()
killPoolWorkerFor :: Pool -> IO ()
killPoolWorkerFor Pool
pool = QSem -> IO ()
waitQSem (Pool -> QSem
pool_sem Pool
pool)
parallel_ :: Pool -> [IO a] -> IO ()
parallel_ :: Pool -> [IO a] -> IO ()
parallel_ Pool
pool [IO a]
xs = Pool -> [IO a] -> IO [a]
forall a. Pool -> [IO a] -> IO [a]
parallel Pool
pool [IO a]
xs IO [a] -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
parallelE_ :: Pool -> [IO a] -> IO [Maybe SomeException]
parallelE_ :: Pool -> [IO a] -> IO [Maybe SomeException]
parallelE_ Pool
pool = ([Either SomeException a] -> [Maybe SomeException])
-> IO [Either SomeException a] -> IO [Maybe SomeException]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Either SomeException a -> Maybe SomeException)
-> [Either SomeException a] -> [Maybe SomeException]
forall a b. (a -> b) -> [a] -> [b]
map ((SomeException -> Maybe SomeException)
-> (a -> Maybe SomeException)
-> Either SomeException a
-> Maybe SomeException
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (\a
_ -> Maybe SomeException
forall a. Maybe a
Nothing))) (IO [Either SomeException a] -> IO [Maybe SomeException])
-> ([IO a] -> IO [Either SomeException a])
-> [IO a]
-> IO [Maybe SomeException]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Pool -> [IO a] -> IO [Either SomeException a]
forall a. Pool -> [IO a] -> IO [Either SomeException a]
parallelE Pool
pool
parallel :: Pool -> [IO a] -> IO [a]
parallel :: Pool -> [IO a] -> IO [a]
parallel Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [a]) -> IO [a])
-> ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
main_tid <- IO ThreadId
myThreadId
[MVar a]
resultvars <- [IO a] -> (IO a -> IO (MVar a)) -> IO [MVar a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO a]
acts ((IO a -> IO (MVar a)) -> IO [MVar a])
-> (IO a -> IO (MVar a)) -> IO [MVar a]
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
MVar a
resultvar <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
main_tid (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
a
res <- IO a -> IO a
forall a. IO a -> IO a
restore IO a
act
Bool
True <- MVar a -> a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
resultvar a
res
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
MVar a -> IO (MVar a)
forall (m :: * -> *) a. Monad m => a -> m a
return MVar a
resultvar
Pool -> IO [a] -> IO [a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((MVar a -> IO a) -> [MVar a] -> IO [a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM MVar a -> IO a
forall a. MVar a -> IO a
takeMVar [MVar a]
resultvars)
parallelE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelE Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a])
-> ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
[MVar (Either SomeException a)]
resultvars <- [IO a]
-> (IO a -> IO (MVar (Either SomeException a)))
-> IO [MVar (Either SomeException a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO a]
acts ((IO a -> IO (MVar (Either SomeException a)))
-> IO [MVar (Either SomeException a)])
-> (IO a -> IO (MVar (Either SomeException a)))
-> IO [MVar (Either SomeException a)]
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
MVar (Either SomeException a)
resultvar <- IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar
ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Either SomeException a
ei_e_res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
act)
Bool
True <- MVar (Either SomeException a) -> Either SomeException a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Either SomeException a)
resultvar Either SomeException a
ei_e_res
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
MVar (Either SomeException a) -> IO (MVar (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return MVar (Either SomeException a)
resultvar
Pool -> IO [Either SomeException a] -> IO [Either SomeException a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((MVar (Either SomeException a) -> IO (Either SomeException a))
-> [MVar (Either SomeException a)] -> IO [Either SomeException a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar [MVar (Either SomeException a)]
resultvars)
parallelInterleaved :: Pool -> [IO a] -> IO [a]
parallelInterleaved :: Pool -> [IO a] -> IO [a]
parallelInterleaved Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [a]) -> IO [a])
-> ((forall a. IO a -> IO a) -> IO [a]) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
main_tid <- IO ThreadId
myThreadId
Chan a
resultchan <- IO (Chan a)
forall a. IO (Chan a)
newChan
[IO a] -> (IO a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [IO a]
acts ((IO a -> IO ()) -> IO ()) -> (IO a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
main_tid (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
a
res <- IO a -> IO a
forall a. IO a -> IO a
restore IO a
act
Chan a -> a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan a
resultchan a
res
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Pool -> IO [a] -> IO [a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((IO a -> IO a) -> [IO a] -> IO [a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\IO a
_act -> Chan a -> IO a
forall a. Chan a -> IO a
readChan Chan a
resultchan) [IO a]
acts)
parallelInterleavedE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelInterleavedE :: Pool -> [IO a] -> IO [Either SomeException a]
parallelInterleavedE Pool
pool [IO a]
acts = ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a])
-> ((forall a. IO a -> IO a) -> IO [Either SomeException a])
-> IO [Either SomeException a]
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
Chan (Either SomeException a)
resultchan <- IO (Chan (Either SomeException a))
forall a. IO (Chan a)
newChan
[IO a] -> (IO a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [IO a]
acts ((IO a -> IO ()) -> IO ()) -> (IO a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO a
act -> do
ThreadId
_tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Either SomeException a
ei_e_res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
act)
Chan (Either SomeException a) -> Either SomeException a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Either SomeException a)
resultchan Either SomeException a
ei_e_res
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Pool -> IO [Either SomeException a] -> IO [Either SomeException a]
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool ((IO a -> IO (Either SomeException a))
-> [IO a] -> IO [Either SomeException a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\IO a
_act -> Chan (Either SomeException a) -> IO (Either SomeException a)
forall a. Chan a -> IO a
readChan Chan (Either SomeException a)
resultchan) [IO a]
acts)
parallelFirst :: Pool -> [IO (Maybe a)] -> IO (Maybe a)
parallelFirst :: Pool -> [IO (Maybe a)] -> IO (Maybe a)
parallelFirst Pool
pool [IO (Maybe a)]
acts = ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a))
-> ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
main_tid <- IO ThreadId
myThreadId
MVar (Maybe a)
resultvar <- IO (MVar (Maybe a))
forall a. IO (MVar a)
newEmptyMVar
([ThreadId]
tids, [MVar ()]
waits) <- ([(ThreadId, MVar ())] -> ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM [(ThreadId, MVar ())] -> ([ThreadId], [MVar ()])
forall a b. [(a, b)] -> ([a], [b])
unzip (IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall a b. (a -> b) -> a -> b
$ [IO (Maybe a)]
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO (Maybe a)]
acts ((IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())])
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall a b. (a -> b) -> a -> b
$ \IO (Maybe a)
act -> do
MVar ()
wait_var <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
ThreadId
tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (IO () -> IO Bool -> IO ()) -> IO Bool -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO Bool -> IO ()
forall a b. IO a -> IO b -> IO a
onNonThreadKilledException (MVar (Maybe a) -> Maybe a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe a)
resultvar Maybe a
forall a. Maybe a
Nothing) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
wait_var ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
ThreadId -> IO () -> IO ()
reflectExceptionsTo ThreadId
main_tid (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe a
mb_res <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore IO (Maybe a)
act
case Maybe a
mb_res of
Maybe a
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
res -> MVar (Maybe a) -> Maybe a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe a)
resultvar (a -> Maybe a
forall a. a -> Maybe a
Just a
res) IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
(ThreadId, MVar ()) -> IO (ThreadId, MVar ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, MVar ()
wait_var)
IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()]
waits IO () -> IO Bool -> IO Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar (Maybe a) -> Maybe a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe a)
resultvar Maybe a
forall a. Maybe a
Nothing IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe a
mb_res <- Pool -> IO (Maybe a) -> IO (Maybe a)
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool (MVar (Maybe a) -> IO (Maybe a)
forall a. MVar a -> IO a
takeMVar MVar (Maybe a)
resultvar)
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread [ThreadId]
tids
Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
mb_res
parallelFirstE :: Pool -> [IO (Maybe a)] -> IO (Maybe (Either SomeException a))
parallelFirstE :: Pool -> [IO (Maybe a)] -> IO (Maybe (Either SomeException a))
parallelFirstE Pool
pool [IO (Maybe a)]
acts = ((forall a. IO a -> IO a) -> IO (Maybe (Either SomeException a)))
-> IO (Maybe (Either SomeException a))
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (Maybe (Either SomeException a)))
-> IO (Maybe (Either SomeException a)))
-> ((forall a. IO a -> IO a)
-> IO (Maybe (Either SomeException a)))
-> IO (Maybe (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
main_tid <- IO ThreadId
myThreadId
MVar (Maybe (Either SomeException a))
resultvar <- IO (MVar (Maybe (Either SomeException a)))
forall a. IO (MVar a)
newEmptyMVar
([ThreadId]
tids, [MVar ()]
waits) <- ([(ThreadId, MVar ())] -> ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM [(ThreadId, MVar ())] -> ([ThreadId], [MVar ()])
forall a b. [(a, b)] -> ([a], [b])
unzip (IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()]))
-> IO [(ThreadId, MVar ())] -> IO ([ThreadId], [MVar ()])
forall a b. (a -> b) -> a -> b
$ [IO (Maybe a)]
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [IO (Maybe a)]
acts ((IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())])
-> (IO (Maybe a) -> IO (ThreadId, MVar ()))
-> IO [(ThreadId, MVar ())]
forall a b. (a -> b) -> a -> b
$ \IO (Maybe a)
act -> do
MVar ()
wait_var <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
ThreadId
tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (Pool -> IO ()
killPoolWorkerFor Pool
pool) (Pool -> IO ()
spawnPoolWorkerFor Pool
pool IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
wait_var ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Either SomeException (Maybe a)
ei_mb_res <- IO (Maybe a) -> IO (Either SomeException (Maybe a))
forall e a. Exception e => IO a -> IO (Either e a)
try (IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore IO (Maybe a)
act)
case Either SomeException (Maybe a)
ei_mb_res of
Left SomeException
e -> MVar (Maybe (Either SomeException a))
-> Maybe (Either SomeException a) -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe (Either SomeException a))
resultvar (Either SomeException a -> Maybe (Either SomeException a)
forall a. a -> Maybe a
Just (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)) IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Right Maybe a
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Right (Just a
res) -> MVar (Maybe (Either SomeException a))
-> Maybe (Either SomeException a) -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe (Either SomeException a))
resultvar (Either SomeException a -> Maybe (Either SomeException a)
forall a. a -> Maybe a
Just (a -> Either SomeException a
forall a b. b -> Either a b
Right a
res)) IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
(ThreadId, MVar ()) -> IO (ThreadId, MVar ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, MVar ()
wait_var)
IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()]
waits IO () -> IO Bool -> IO Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar (Maybe (Either SomeException a))
-> Maybe (Either SomeException a) -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe (Either SomeException a))
resultvar Maybe (Either SomeException a)
forall a. Maybe a
Nothing IO Bool -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe (Either SomeException a)
mb_res <- Pool
-> IO (Maybe (Either SomeException a))
-> IO (Maybe (Either SomeException a))
forall a. Pool -> IO a -> IO a
extraWorkerWhileBlocked Pool
pool (MVar (Maybe (Either SomeException a))
-> IO (Maybe (Either SomeException a))
forall a. MVar a -> IO a
takeMVar MVar (Maybe (Either SomeException a))
resultvar)
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread [ThreadId]
tids
Maybe (Either SomeException a)
-> IO (Maybe (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either SomeException a)
mb_res