streamly-0.7.3: Beautiful Streaming, Concurrent and Reactive Composition
Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.Parallel

Description

 
Synopsis

Parallel Stream Type

data ParallelT m a Source #

Async composition with strict concurrent execution of all streams.

The Semigroup instance of ParallelT executes both the streams concurrently without any delay or without waiting for the consumer demand and merges the results as they arrive. If the consumer does not consume the results, they are buffered upto a configured maximum, controlled by the maxBuffer primitive. If the buffer becomes full the concurrent tasks will block until there is space in the buffer.

Both WAsyncT and ParallelT, evaluate the constituent streams fairly in a round robin fashion. The key difference is that WAsyncT might wait for the consumer demand before it executes the tasks whereas ParallelT starts executing all the tasks immediately without waiting for the consumer demand. For WAsyncT the maxThreads limit applies whereas for ParallelT it does not apply. In other words, WAsyncT can be lazy whereas ParallelT is strict.

ParallelT is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. If we do not have such requirements then AsyncT or AheadT are recommended as they can be more efficient than ParallelT.

main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

When streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the Async style streams.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of ParallelT runs all iterations of the loop concurrently.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = drain . parallely $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: 0.1.0

Instances

Instances details
MonadTrans ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

type Parallel = ParallelT IO Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0

parallely :: IsStream t => ParallelT m a -> t m a Source #

Fix the type of a polymorphic stream as ParallelT.

Since: 0.1.0

Merge Concurrently

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of ParallelT Merges two streams concurrently.

Since: 0.2.0

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as the first stream stops.

Internal

parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as any of the two streams stops.

Internal

Evaluate Concurrently

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.

Internal

Tap Concurrently

tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #

Redirect a copy of the stream to a supplied fold and run it concurrently in an independent thread. The fold may buffer some elements. The buffer size is determined by the prevailing maxBuffer setting.

              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----

> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2)
1
2

Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.

Compare with tap.

Internal

distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #

Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.

>>> S.drain $ distributeAsync_ [S.mapM_ print, S.mapM_ print] (S.enumerateFromTo 1 2)
distributeAsync_ = flip (foldr tapAsync)

Internal