-- | Pipes that introduce parallelism on different levels.

module Pipes.Parallel where

import Control.Monad.Codensity (lowerCodensity)
import Control.Monad (replicateM)
import Control.Parallel.Strategies (Strategy, parMap)
import Pipes



-- | Evaluates chunks of pipes elements in parallel with a pure function.

pipePar
  :: (Monad m)
  => Int
  -- ^ number of elements to evaluate in parallel
  -> Strategy b
  -- ^ with which strategy
  -> (a -> b)
  -- ^ function to be mapped in parallel
  -> Pipe a b m ()
pipePar :: Int -> Strategy b -> (a -> b) -> Pipe a b m ()
pipePar Int
n Strategy b
strat a -> b
f = Int
-> Strategy b
-> (a -> b)
-> ([a] -> m ((), [a]))
-> (() -> [b] -> m [b])
-> Pipe a b m ()
forall (m :: * -> *) b a x.
Monad m =>
Int
-> Strategy b
-> (a -> b)
-> ([a] -> m (x, [a]))
-> (x -> [b] -> m [b])
-> Pipe a b m ()
pipeParBA Int
n Strategy b
strat a -> b
f (\[a]
as -> ((), [a]) -> m ((), [a])
forall (m :: * -> *) a. Monad m => a -> m a
return ((),[a]
as)) (\() [b]
bs -> [b] -> m [b]
forall (m :: * -> *) a. Monad m => a -> m a
return [b]
bs)
{-
  where
  go = do
    xs <- lowerCodensity . replicateM n $ lift await
    let ys = parMap strat f xs
    lowerCodensity $ mapM_ (lift . yield) ys
    go
-}

-- | Evaluates chunks of pipes elements in parallel with a pure function.
-- Before and after each parallel step, a monadic function is run. This
-- allows generation of certain statistics or information during runs.

pipeParBA
  :: (Monad m)
  => Int
  -- ^ number of elements to evaluate in parallel
  -> Strategy b
  -- ^ with which strategy
  -> (a -> b)
  -- ^ pure function to run in parallel
  -> ([a] -> m (x,[a]))
  -- ^ function to run before
  -> (x -> [b] -> m [b])
  -- ^ function to run after
  -> Pipe a b m ()
pipeParBA :: Int
-> Strategy b
-> (a -> b)
-> ([a] -> m (x, [a]))
-> (x -> [b] -> m [b])
-> Pipe a b m ()
pipeParBA Int
n Strategy b
strat a -> b
f [a] -> m (x, [a])
bef x -> [b] -> m [b]
aft = Pipe a b m ()
go
  where
  go :: Pipe a b m ()
go = do
    [a]
as' <- Codensity (Proxy () a () b m) [a] -> Proxy () a () b m [a]
forall (f :: * -> *) a. Applicative f => Codensity f a -> f a
lowerCodensity (Codensity (Proxy () a () b m) [a] -> Proxy () a () b m [a])
-> (Codensity (Proxy () a () b m) a
    -> Codensity (Proxy () a () b m) [a])
-> Codensity (Proxy () a () b m) a
-> Proxy () a () b m [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int
-> Codensity (Proxy () a () b m) a
-> Codensity (Proxy () a () b m) [a]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
n (Codensity (Proxy () a () b m) a -> Proxy () a () b m [a])
-> Codensity (Proxy () a () b m) a -> Proxy () a () b m [a]
forall a b. (a -> b) -> a -> b
$ Proxy () a () b m a -> Codensity (Proxy () a () b m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Proxy () a () b m a
forall (m :: * -> *) a. Functor m => Consumer' a m a
await
    (x
x,[a]
as) <- m (x, [a]) -> Proxy () a () b m (x, [a])
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (x, [a]) -> Proxy () a () b m (x, [a]))
-> m (x, [a]) -> Proxy () a () b m (x, [a])
forall a b. (a -> b) -> a -> b
$ [a] -> m (x, [a])
bef [a]
as'
    let bs' :: [b]
bs' = Strategy b -> (a -> b) -> [a] -> [b]
forall b a. Strategy b -> (a -> b) -> [a] -> [b]
parMap Strategy b
strat a -> b
f [a]
as
    [b]
bs <- m [b] -> Proxy () a () b m [b]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m [b] -> Proxy () a () b m [b]) -> m [b] -> Proxy () a () b m [b]
forall a b. (a -> b) -> a -> b
$ x -> [b] -> m [b]
aft x
x [b]
bs'
    Codensity (Proxy () a () b m) () -> Pipe a b m ()
forall (f :: * -> *) a. Applicative f => Codensity f a -> f a
lowerCodensity (Codensity (Proxy () a () b m) () -> Pipe a b m ())
-> Codensity (Proxy () a () b m) () -> Pipe a b m ()
forall a b. (a -> b) -> a -> b
$ (b -> Codensity (Proxy () a () b m) ())
-> [b] -> Codensity (Proxy () a () b m) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Pipe a b m () -> Codensity (Proxy () a () b m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Pipe a b m () -> Codensity (Proxy () a () b m) ())
-> (b -> Pipe a b m ()) -> b -> Codensity (Proxy () a () b m) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Pipe a b m ()
forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield) [b]
bs
    Pipe a b m ()
go