{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ConstraintKinds           #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs              #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE UndecidableInstances      #-} -- XXX

#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Data.Stream.Parallel
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.Parallel
    (
    -- * Parallel Stream Type
      ParallelT
    , Parallel
    , parallely

    -- * Merge Concurrently
    , parallel
    , parallelFst
    , parallelMin

    -- * Evaluate Concurrently
    , mkParallel

    -- * Tap Concurrently
    , tapAsync
    , distributeAsync_
    )
where

import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map)

import qualified Data.Set as Set

import Streamly.Internal.Data.Stream.SVar
       (fromSVar, fromProducer, fromConsumer, pushToFold)
import Streamly.Internal.Data.Stream.StreamK
       (IsStream(..), Stream, mkStream, foldStream, foldStreamShared, adapt)

import Streamly.Internal.Data.SVar

import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D

#include "Instances.hs"

-------------------------------------------------------------------------------
-- Parallel
-------------------------------------------------------------------------------

-------------------------------------------------------------------------------
-- StreamK based worker routines
-------------------------------------------------------------------------------

{-# NOINLINE runOne #-}
runOne
    :: MonadIO m
    => State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne :: State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo =
    case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
        Maybe Count
Nothing -> Stream m a -> m ()
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> m ()
go Stream m a
m0
        Just Count
_  -> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo

    where

    go :: t m a -> m ()
go t m a
m = do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
        State Stream m a
-> (a -> t m a -> m ()) -> (a -> m ()) -> m () -> t m a -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m ()
yieldk a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
single m ()
stop t m a
m

    sv :: SVar Stream m a
sv = Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st

    stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
        SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
    sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    single :: a -> m ()
single a
a = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
    yieldk :: a -> t m a -> m ()
yieldk a
a t m a
r = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t m a -> m ()
go t m a
r

runOneLimited
    :: MonadIO m
    => State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited :: State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo = Stream m a -> m ()
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> m ()
go Stream m a
m0

    where

    go :: t m a -> m ()
go t m a
m = do
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
        if Bool
yieldLimitOk
        then do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
            State Stream m a
-> (a -> t m a -> m ()) -> (a -> m ()) -> m () -> t m a -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m ()
yieldk a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
single m ()
stop t m a
m
        else do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar Stream m a
sv
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo

    sv :: SVar Stream m a
sv = Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st

    stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
        SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
        SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
    sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
    single :: a -> m ()
single a
a = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
    yieldk :: a -> t m a -> m ()
yieldk a
a t m a
r = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t m a -> m ()
go t m a
r

-------------------------------------------------------------------------------
-- Consing and appending a stream in parallel style
-------------------------------------------------------------------------------

-- Note that consing and appending requires StreamK as it would not scale well
-- with StreamD unless we are only consing a very small number of streams or
-- elements in a stream. StreamK allows us to manipulate control flow in a way
-- which StreamD cannot allow. StreamK can make a jump without having to
-- remember the past state.

{-# NOINLINE forkSVarPar #-}
forkSVarPar :: (IsStream t, MonadAsync m)
    => SVarStopStyle -> t m a -> t m a -> t m a
forkSVarPar :: SVarStopStyle -> t m a -> t m a -> t m a
forkSVarPar SVarStopStyle
ss t m a
m t m a
r = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State Stream m a
st
    SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} (Stream m a -> Maybe WorkerInfo -> m ())
-> Stream m a -> Maybe WorkerInfo -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m)
    case SVarStopStyle
ss of
        SVarStopStyle
StopBy -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            Set ThreadId
set <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar Stream m a
sv)
            IORef ThreadId -> ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVar Stream m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar Stream m a
sv) (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Set ThreadId -> ThreadId
forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
        SVarStopStyle
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} (Stream m a -> Maybe WorkerInfo -> m ())
-> Stream m a -> Maybe WorkerInfo -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
r)
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVar Stream m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv)

{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: (IsStream t, MonadAsync m)
    => SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar :: SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
    case State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
        Just SVar Stream m a
sv | SVar Stream m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& SVar Stream m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar Stream m a
sv SVarStopStyle -> SVarStopStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
            -- Here, WE ARE IN THE WORKER/PRODUCER THREAD, we know that because
            -- the SVar exists. We are running under runOne and the output we
            -- produce ultimately will be sent to the SVar by runOne.
            --
            -- If we came here the worker/runOne is evaluating a `parallel`
            -- combinator. In this case, we always fork a new worker for the
            -- first component (m1) in the parallel composition and continue to
            -- evaluate the second component (m2) in the current worker thread.
            --
            -- When m1 is serially composed, the worker would evaluate it
            -- without any further forks and the resulting output is sent to
            -- the SVar and the evaluation terminates. If m1 is a `parallel`
            -- composition of two streams the worker would again recurses here.
            --
            -- Similarly, when m2 is serially composed it gets evaluated here
            -- and the resulting output is sent to the SVar by the runOne
            -- wrapper. When m2 is composed with `parallel` it will again
            -- recurse here and so on until it finally terminates.
            --
            -- When we create a right associated expression using `parallel`,
            -- then m1 would always terminate without further forks or
            -- recursion into this routine, therefore, the worker returns
            -- immediately after evaluating it. And m2 would continue to
            -- fork/recurse, therefore, the current thread always recurses and
            -- forks new workers one after the other.  This is a tail recursive
            -- style execution, m2, the recursive expression always executed at
            -- the tail.
            --
            -- When the expression is left associated, the worker spawned would
            -- get the forking/recursing responsibility and then again the
            -- worker spawned by that worker would fork, thus creating layer
            -- over layer of workers and a chain of threads leading to a very
            -- inefficient execution.
            SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st (Stream m a -> Maybe WorkerInfo -> m ())
-> Stream m a -> Maybe WorkerInfo -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1)
            State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m2
        Maybe (SVar Stream m a)
_ ->
            -- Here WE ARE IN THE CONSUMER THREAD, we create a new SVar, fork
            -- worker threads to execute m1 and m2 and this thread starts
            -- pulling the stream from the SVar.
            State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStopStyle -> t m a -> t m a -> t m a
forkSVarPar SVarStopStyle
ss t m a
m1 t m a
m2)

-------------------------------------------------------------------------------
-- User facing APIs
-------------------------------------------------------------------------------

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using parallel.
{-# INLINE consMParallel #-}
{-# SPECIALIZE consMParallel :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consMParallel :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consMParallel :: m a -> ParallelT m a -> ParallelT m a
consMParallel m a
m ParallelT m a
r = Stream m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> ParallelT m a) -> Stream m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.yieldM m a
m Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` (ParallelT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream ParallelT m a
r)

-- | Polymorphic version of the 'Semigroup' operation '<>' of 'ParallelT'
-- Merges two streams concurrently.
--
-- @since 0.2.0
{-# INLINE parallel #-}
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallel :: t m a -> t m a -> t m a
parallel = SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone

-- This is a co-parallel like combinator for streams, where first stream is the
-- main stream and the rest are just supporting it, when the first ends
-- everything ends.
--
-- | Like `parallel` but stops the output as soon as the first stream stops.
--
-- /Internal/
{-# INLINE parallelFst #-}
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallelFst :: t m a -> t m a -> t m a
parallelFst = SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy

-- This is a race like combinator for streams.
--
-- | Like `parallel` but stops the output as soon as any of the two streams
-- stops.
--
-- /Internal/
{-# INLINE parallelMin #-}
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallelMin :: t m a -> t m a -> t m a
parallelMin = SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny

------------------------------------------------------------------------------
-- Convert a stream to parallel
------------------------------------------------------------------------------

-- | Generate a stream asynchronously to keep it buffered, lazily consume
-- from the buffer.
--
-- /Internal/
--
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
mkParallel :: t m a -> t m a
mkParallel t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st)
    -- pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m)
    State Stream m a -> SVar Stream m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
D.toSVarParallel State Stream m a
st SVar Stream m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv

------------------------------------------------------------------------------
-- Clone and distribute a stream in parallel
------------------------------------------------------------------------------

-- Tap a stream and send the elements to the specified SVar in addition to
-- yielding them again.
--
-- XXX this could be written in StreamD style for better efficiency with fusion.
--
{-# INLINE teeToSVar #-}
teeToSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> t m a
teeToSVar :: SVar Stream m a -> t m a -> t m a
teeToSVar SVar Stream m a
svr t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (Bool -> t m a -> t m a
forall (t :: (* -> *) -> * -> *).
IsStream t =>
Bool -> t m a -> t m a
go Bool
False t m a
m)

    where

    go :: Bool -> t m a -> t m a
go Bool
False t m a
m0 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
_ m r
stp -> do
        let drain :: m ()
drain = do
                -- In general, a Stop event would come equipped with the result
                -- of the fold. It is not used here but it would be useful in
                -- applicative and distribute.
                Bool
done <- SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
svr
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
                           (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
                    m ()
drain

            stopFold :: m ()
stopFold = do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
                -- drain/wait until a stop event arrives from the fold.
                m ()
drain

            stop :: m r
stop       = m ()
stopFold m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
            single :: a -> m r
single a
a   = do
                Bool
done <- SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a
                a -> t m a -> m r
yld a
a (Bool -> t m a -> t m a
go Bool
done (m () -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a
K.nilM m ()
stopFold))
            yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a m Bool -> (Bool -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
done -> a -> t m a -> m r
yld a
a (Bool -> t m a -> t m a
go Bool
done t m a
r)
         in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m0

    go Bool
True t m a
m0 = t m a
m0

-- In case of folds the roles of worker and parent on an SVar are reversed. The
-- parent stream pushes values to an SVar instead of pulling from it and a
-- worker thread running the fold pulls from the SVar and folds the stream. We
-- keep a separate channel for pushing exceptions in the reverse direction i.e.
-- from the fold to the parent stream.
--
-- Note: If we terminate due to an exception, we do not actively terminate the
-- fold. It gets cleaned up by the GC.

-- | Create an SVar with a fold consumer that will fold any elements sent to it
-- using the supplied fold function.
{-# INLINE newFoldSVar #-}
newFoldSVar :: (IsStream t, MonadAsync m)
    => State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
newFoldSVar :: State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
newFoldSVar State Stream m a
stt t m a -> m b
f = do
    -- Buffer size for the SVar is derived from the current state
    SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
stt)

    -- Add the producer thread-id to the SVar.
    IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Stream m a
sv

    m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> m b
f (t m a -> m b) -> t m a -> m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv)
                  (SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv)
                  (SVar Stream m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar Stream m a
sv)
    SVar Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv

-- NOTE: In regular pull style streams, the consumer stream is pulling elements
-- from the SVar and we have several workers producing elements and pushing to
-- SVar. In case of folds, we, the parent stream driving the fold, are the
-- stream producing worker, we start an SVar and start pushing to the SVar, the
-- fold on the other side of the SVar is the consumer stream.
--
-- In the pull stream case exceptions are propagated from the producing workers
-- to the consumer stream, the exceptions are propagated on the same channel as
-- the produced stream elements. However, in case of push style folds the
-- current stream itself is the worker and the fold is the consumer, in this
-- case we have to propagate the exceptions from the consumer to the producer.
-- This is reverse of the pull case and we need a reverse direction channel
-- to propagate the exception.
--
-- | Redirect a copy of the stream to a supplied fold and run it concurrently
-- in an independent thread. The fold may buffer some elements. The buffer size
-- is determined by the prevailing 'maxBuffer' setting.
--
-- @
--               Stream m a -> m b
--                       |
-- -----stream m a ---------------stream m a-----
--
-- @
--
-- @
-- > S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2)
-- 1
-- 2
-- @
--
-- Exceptions from the concurrently running fold are propagated to the current
-- computation.  Note that, because of buffering in the fold, exceptions may be
-- delayed and may not correspond to the current element being processed in the
-- parent stream, but we guarantee that before the parent stream stops the tap
-- finishes and all exceptions from it are drained.
--
--
-- Compare with 'tap'.
--
-- /Internal/
{-# INLINE tapAsync #-}
tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
tapAsync :: (t m a -> m b) -> t m a -> t m a
tapAsync t m a -> m b
f t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar Stream m a
sv <- State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
newFoldSVar State Stream m a
st t m a -> m b
f
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVar Stream m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVar Stream m a -> t m a -> t m a
teeToSVar SVar Stream m a
sv t m a
m)

-- | Concurrently distribute a stream to a collection of fold functions,
-- discarding the outputs of the folds.
--
-- >>> S.drain $ distributeAsync_ [S.mapM_ print, S.mapM_ print] (S.enumerateFromTo 1 2)
--
-- @
-- distributeAsync_ = flip (foldr tapAsync)
-- @
--
-- /Internal/
--
{-# INLINE distributeAsync_ #-}
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m)
    => f (t m a -> m b) -> t m a -> t m a
distributeAsync_ :: f (t m a -> m b) -> t m a -> t m a
distributeAsync_ = (t m a -> f (t m a -> m b) -> t m a)
-> f (t m a -> m b) -> t m a -> t m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip (((t m a -> m b) -> t m a -> t m a)
-> t m a -> f (t m a -> m b) -> t m a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (t m a -> m b) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsync)

------------------------------------------------------------------------------
-- ParallelT
------------------------------------------------------------------------------

-- | Async composition with strict concurrent execution of all streams.
--
-- The 'Semigroup' instance of 'ParallelT' executes both the streams
-- concurrently without any delay or without waiting for the consumer demand
-- and /merges/ the results as they arrive. If the consumer does not consume
-- the results, they are buffered upto a configured maximum, controlled by the
-- 'maxBuffer' primitive. If the buffer becomes full the concurrent tasks will
-- block until there is space in the buffer.
--
-- Both 'WAsyncT' and 'ParallelT', evaluate the constituent streams fairly in a
-- round robin fashion. The key difference is that 'WAsyncT' might wait for the
-- consumer demand before it executes the tasks whereas 'ParallelT' starts
-- executing all the tasks immediately without waiting for the consumer demand.
-- For 'WAsyncT' the 'maxThreads' limit applies whereas for 'ParallelT' it does
-- not apply. In other words, 'WAsyncT' can be lazy whereas 'ParallelT' is
-- strict.
--
-- 'ParallelT' is useful for cases when the streams are required to be
-- evaluated simultaneously irrespective of how the consumer consumes them e.g.
-- when we want to race two tasks and want to start both strictly at the same
-- time or if we have timers in the parallel tasks and our results depend on
-- the timers being started at the same time. If we do not have such
-- requirements then 'AsyncT' or 'AheadT' are recommended as they can be more
-- efficient than 'ParallelT'.
--
-- @
-- main = ('toList' . 'parallely' $ (fromFoldable [1,2]) \<> (fromFoldable [3,4])) >>= print
-- @
-- @
-- [1,3,2,4]
-- @
--
-- When streams with more than one element are merged, it yields whichever
-- stream yields first without any bias, unlike the 'Async' style streams.
--
-- Any exceptions generated by a constituent stream are propagated to the
-- output stream. The output and exceptions from a single stream are guaranteed
-- to arrive in the same order in the resulting stream as they were generated
-- in the input stream. However, the relative ordering of elements from
-- different streams in the resulting stream can vary depending on scheduling
-- and generation delays.
--
-- Similarly, the 'Monad' instance of 'ParallelT' runs /all/ iterations
-- of the loop concurrently.
--
-- @
-- import "Streamly"
-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = 'drain' . 'parallely' $ do
--     n <- return 3 \<\> return 2 \<\> return 1
--     S.yieldM $ do
--          threadDelay (n * 1000000)
--          myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
-- @
-- ThreadId 40: Delay 1
-- ThreadId 39: Delay 2
-- ThreadId 38: Delay 3
-- @
--
-- Note that parallel composition can only combine a finite number of
-- streams as it needs to retain state for each unfinished stream.
--
-- /Since: 0.7.0 (maxBuffer applies to ParallelT streams)/
--
-- /Since: 0.1.0/
newtype ParallelT m a = ParallelT {ParallelT m a -> Stream m a
getParallelT :: Stream m a}
    deriving (m a -> ParallelT m a
(forall (m :: * -> *) a. Monad m => m a -> ParallelT m a)
-> MonadTrans ParallelT
forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> ParallelT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
MonadTrans)

-- | A parallely composing IO stream of elements of type @a@.
-- See 'ParallelT' documentation for more details.
--
-- @since 0.2.0
type Parallel = ParallelT IO

-- | Fix the type of a polymorphic stream as 'ParallelT'.
--
-- @since 0.1.0
parallely :: IsStream t => ParallelT m a -> t m a
parallely :: ParallelT m a -> t m a
parallely = ParallelT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream ParallelT where
    toStream :: ParallelT m a -> Stream m a
toStream = ParallelT m a -> Stream m a
forall (m :: * -> *) a. ParallelT m a -> Stream m a
getParallelT
    fromStream :: Stream m a -> ParallelT m a
fromStream = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
    consM :: m a -> ParallelT m a -> ParallelT m a
consM = m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consMParallel

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> ParallelT IO a -> ParallelT IO a #-}
    |: :: m a -> ParallelT m a -> ParallelT m a
(|:) = m a -> ParallelT m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE mappendParallel #-}
{-# SPECIALIZE mappendParallel :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
mappendParallel :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
mappendParallel :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappendParallel ParallelT m a
m1 ParallelT m a
m2 = Stream m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> ParallelT m a) -> Stream m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (ParallelT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream ParallelT m a
m1) (ParallelT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream ParallelT m a
m2)

instance MonadAsync m => Semigroup (ParallelT m a) where
    <> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = ParallelT m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
mappendParallel

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (ParallelT m a) where
    mempty :: ParallelT m a
mempty = ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
    mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = ParallelT m a -> ParallelT m a -> ParallelT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel :: ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m => ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT Stream m (a -> b)
m1) (ParallelT Stream m a
m2) =
    let f :: (a -> b) -> Stream m b
f a -> b
x1 = (forall c. Stream m c -> Stream m c -> Stream m c)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in Stream m b -> ParallelT m b
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m b -> ParallelT m b) -> Stream m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1

instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
    {-# INLINE pure #-}
    pure :: a -> ParallelT m a
pure = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m a -> ParallelT m a)
-> (a -> Stream m a) -> a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
    {-# INLINE (<*>) #-}
    <*> :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

{-# INLINE bindParallel #-}
{-# SPECIALIZE bindParallel :: ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bindParallel :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bindParallel :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bindParallel ParallelT m a
m a -> ParallelT m b
f = Stream m b -> ParallelT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m b -> ParallelT m b) -> Stream m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> t m a -> (a -> t m b) -> t m b
K.bindWith forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (ParallelT m a -> Stream m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt ParallelT m a
m) (\a
a -> ParallelT m b -> Stream m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt (ParallelT m b -> Stream m b) -> ParallelT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ a -> ParallelT m b
f a
a)

instance MonadAsync m => Monad (ParallelT m) where
    return :: a -> ParallelT m a
return = a -> ParallelT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    >>= :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bindParallel

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)