{-
The parallel function (specialised to lists) is equivalent to:

import Control.Parallel.Strategies
parallel :: [IO [a]] -> IO [[a]]
parallel = pure . withStrategy (parList $ seqList r0) . map unsafePerformIO

However, this version performs about 10% slower with 2 processors in GHC 6.12.1
-}

module Parallel(parallel) where

import System.IO.Unsafe
import Control.Concurrent
import Control.Exception
import Control.Monad


parallel :: Int -> [IO a] -> IO [a]
parallel :: forall a. Int -> [IO a] -> IO [a]
parallel Int
j = if Int
j Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
1 then [IO a] -> IO [a]
forall a. [IO a] -> IO [a]
parallel1 else Int -> [IO a] -> IO [a]
forall a. Int -> [IO a] -> IO [a]
parallelN Int
j


parallel1 :: [IO a] -> IO [a]
parallel1 :: forall a. [IO a] -> IO [a]
parallel1 [] = [a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
parallel1 (IO a
x:[IO a]
xs) = do
    a
x2 <- IO a
x
    [a]
xs2 <- IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (IO [a] -> IO [a]) -> IO [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ [IO a] -> IO [a]
forall a. [IO a] -> IO [a]
parallel1 [IO a]
xs
    [a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO [a]) -> [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ a
x2a -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
xs2


parallelN :: Int -> [IO a] -> IO [a]
parallelN :: forall a. Int -> [IO a] -> IO [a]
parallelN Int
j [IO a]
xs = do
    [MVar (Either SomeException a)]
ms <- (IO a -> IO (MVar (Either SomeException a)))
-> [IO a] -> IO [MVar (Either SomeException a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (IO (MVar (Either SomeException a))
-> IO a -> IO (MVar (Either SomeException a))
forall a b. a -> b -> a
const IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar) [IO a]
xs
    Chan (Maybe (MVar (Either SomeException a), IO a))
chan <- IO (Chan (Maybe (MVar (Either SomeException a), IO a)))
forall a. IO (Chan a)
newChan
    ((MVar (Either SomeException a), IO a) -> IO ())
-> [(MVar (Either SomeException a), IO a)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Chan (Maybe (MVar (Either SomeException a), IO a))
-> Maybe (MVar (Either SomeException a), IO a) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe (MVar (Either SomeException a), IO a))
chan (Maybe (MVar (Either SomeException a), IO a) -> IO ())
-> ((MVar (Either SomeException a), IO a)
    -> Maybe (MVar (Either SomeException a), IO a))
-> (MVar (Either SomeException a), IO a)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MVar (Either SomeException a), IO a)
-> Maybe (MVar (Either SomeException a), IO a)
forall a. a -> Maybe a
Just) ([(MVar (Either SomeException a), IO a)] -> IO ())
-> [(MVar (Either SomeException a), IO a)] -> IO ()
forall a b. (a -> b) -> a -> b
$ [MVar (Either SomeException a)]
-> [IO a] -> [(MVar (Either SomeException a), IO a)]
forall a b. [a] -> [b] -> [(a, b)]
zip [MVar (Either SomeException a)]
ms [IO a]
xs
    Int -> IO ThreadId -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
j (Chan (Maybe (MVar (Either SomeException a), IO a))
-> Maybe (MVar (Either SomeException a), IO a) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe (MVar (Either SomeException a), IO a))
chan Maybe (MVar (Either SomeException a), IO a)
forall a. Maybe a
Nothing IO () -> IO ThreadId -> IO ThreadId
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> IO ThreadId
forkIO (Chan (Maybe (MVar (Either SomeException a), IO a)) -> IO ()
forall {e} {a}.
Exception e =>
Chan (Maybe (MVar (Either e a), IO a)) -> IO ()
f Chan (Maybe (MVar (Either SomeException a), IO a))
chan))
    let throwE :: SomeException -> a
throwE SomeException
x = SomeException -> a
forall a e. Exception e => e -> a
throw (SomeException
x :: SomeException)
    [IO a] -> IO [a]
forall a. [IO a] -> IO [a]
parallel1 ([IO a] -> IO [a]) -> [IO a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ (MVar (Either SomeException a) -> IO a)
-> [MVar (Either SomeException a)] -> [IO a]
forall a b. (a -> b) -> [a] -> [b]
map ((Either SomeException a -> a)
-> IO (Either SomeException a) -> IO a
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((SomeException -> a) -> (a -> a) -> Either SomeException a -> a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> a
forall {a}. SomeException -> a
throwE a -> a
forall a. a -> a
id) (IO (Either SomeException a) -> IO a)
-> (MVar (Either SomeException a) -> IO (Either SomeException a))
-> MVar (Either SomeException a)
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar) [MVar (Either SomeException a)]
ms
    where
        f :: Chan (Maybe (MVar (Either e a), IO a)) -> IO ()
f Chan (Maybe (MVar (Either e a), IO a))
chan = do
            Maybe (MVar (Either e a), IO a)
v <- Chan (Maybe (MVar (Either e a), IO a))
-> IO (Maybe (MVar (Either e a), IO a))
forall a. Chan a -> IO a
readChan Chan (Maybe (MVar (Either e a), IO a))
chan
            case Maybe (MVar (Either e a), IO a)
v of
                Maybe (MVar (Either e a), IO a)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Just (MVar (Either e a)
m,IO a
x) -> do
                    MVar (Either e a) -> Either e a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either e a)
m (Either e a -> IO ()) -> IO (Either e a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO a -> IO (Either e a)
forall e a. Exception e => IO a -> IO (Either e a)
try IO a
x
                    Chan (Maybe (MVar (Either e a), IO a)) -> IO ()
f Chan (Maybe (MVar (Either e a), IO a))
chan