Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
To run examples in this module:
>>>
import qualified Streamly.Prelude as Stream
>>>
import Control.Concurrent (threadDelay)
>>>
:{
delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Synopsis
- newtype ParallelT m a = ParallelT {
- getParallelT :: Stream m a
- type Parallel = ParallelT IO
- consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
- parallelK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
- parallelFstK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
- parallelMinK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
- mkParallelD :: MonadAsync m => Stream m a -> Stream m a
- mkParallelK :: MonadAsync m => Stream m a -> Stream m a
- tapAsyncK :: MonadAsync m => (Stream m a -> m b) -> Stream m a -> Stream m a
- tapAsyncF :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a
- newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a)
Parallel Stream Type
newtype ParallelT m a Source #
For ParallelT
streams:
(<>) =parallel
(>>=) = flip .concatMapWith
parallel
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
ParallelT | |
|
Instances
MonadTrans ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
IsStream ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # | |
MonadAsync m => Monad (ParallelT m) Source # | |
Monad m => Functor (ParallelT m) Source # | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Monoid (ParallelT m a) Source # | |
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #
XXX we can implement it more efficienty by directly implementing instead of combining streams using parallel.
Merge Concurrently
parallelFstK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a Source #
Like parallel
but stops the output as soon as the first stream stops.
Pre-release
parallelMinK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a Source #
Like parallel
but stops the output as soon as any of the two streams
stops.
Pre-release
Evaluate Concurrently
mkParallelD :: MonadAsync m => Stream m a -> Stream m a Source #
Same as mkParallel
but for StreamD stream.
mkParallelK :: MonadAsync m => Stream m a -> Stream m a Source #
Like mkParallel
but uses StreamK internally.
Pre-release
Tap Concurrently
tapAsyncK :: MonadAsync m => (Stream m a -> m b) -> Stream m a -> Stream m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing maxBuffer
setting.
Stream m a -> m b | -----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap
.
Pre-release
tapAsyncF :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a Source #
Like tapAsync
but uses a Fold
instead of a fold function.
Callbacks
newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a) Source #
Generates a callback and a stream pair. The callback returned is used to queue values to the stream. The stream is infinite, there is no way for the callback to indicate that it is done now.
Pre-release