streamly-0.7.3: Beautiful Streaming, Concurrent and Reactive Composition
Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.Async

Description

 
Synopsis

Documentation

data AsyncT m a Source #

The Semigroup operation (<>) for AsyncT merges two streams concurrently with priority given to the first stream. In s1 <> s2 <> s3 ... the streams s1, s2 and s3 are scheduled for execution in that order. Multiple scheduled streams may be executed concurrently and the elements generated by them are served to the consumer as and when they become available. This behavior is similar to the scheduling and execution behavior of actions in a single async stream.

Since only a finite number of streams are executed concurrently, this operation can be used to fold an infinite lazy container of streams.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = (S.toList . asyncly $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the monad instance of AsyncT may run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer.

main = drain . asyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Since: 0.1.0

Instances

Instances details
MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0

asyncly :: IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of AsyncT. Merges two streams possibly concurrently, preferring the elements from the left one when available.

Since: 0.2.0

(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Deprecated: Please use async instead.

Same as async.

Since: 0.1.0

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Internal

mkAsyncK :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.

Internal

data WAsyncT m a Source #

WAsyncT is similar to WSerialT but with concurrent execution. The Semigroup operation (<>) for WAsyncT merges two streams concurrently interleaving the actions from both the streams. In s1 <> s2 <> s3 ..., the individual actions from streams s1, s2 and s3 are scheduled for execution in a round-robin fashion. Multiple scheduled actions may be executed concurrently, the results from concurrent executions are consumed in the order in which they become available.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = (S.toList . wAsyncly . maxThreads 1 $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]

For this example, we are using maxThreads 1 so that concurrent thread scheduling does not affect the results and make them unpredictable. Let's now take a more general example:

main = (S.toList . wAsyncly . maxThreads 1 $ (S.fromList [1,2,3]) <> (S.fromList [4,5,6]) <> (S.fromList [7,8,9])) >>= print
[1,4,2,7,5,3,8,6,9]

This is how the execution of the above stream proceeds:

  1. The scheduler queue is initialized with [S.fromList [1,2,3], (S.fromList [4,5,6]) <> (S.fromList [7,8,9])] assuming the head of the queue is represented by the rightmost item.
  2. S.fromList [1,2,3] is executed, yielding the element 1 and putting [2,3] at the back of the scheduler queue. The scheduler queue now looks like [(S.fromList [4,5,6]) <> (S.fromList [7,8,9]), S.fromList [2,3]].
  3. Now (S.fromList [4,5,6]) <> (S.fromList [7,8,9]) is picked up for execution, S.fromList [7,8,9] is added at the back of the queue and S.fromList [4,5,6] is executed, yielding the element 4 and adding S.fromList [5,6] at the back of the queue. The queue now looks like [S.fromList [2,3], S.fromList [7,8,9], S.fromList [5,6]].
  4. Note that the scheduler queue expands by one more stream component in every pass because one more <> is broken down into two components. At this point there are no more <> operations to be broken down further and the queue has reached its maximum size. Now these streams are scheduled in round-robin fashion yielding [2,7,5,3,8,8,9].

As we see above, in a right associated expression composed with <>, only one <> operation is broken down into two components in one execution, therefore, if we have n streams composed using <> it will take n scheduler passes to expand the whole expression. By the time n-th component is added to the scheduler queue, the first component would have received n scheduler passes.

Since all streams get interleaved, this operation is not suitable for folding an infinite lazy container of infinite size streams. However, if the streams are small, the streams on the left may get finished before more streams are added to the scheduler queue from the right side of the expression, so it may be possible to fold an infinite lazy container of streams. For example, if the streams are of size n then at most n streams would be in the scheduler queue at a time.

Note that WSerialT and WAsyncT differ in their scheduling behavior, therefore the output of WAsyncT even with a single thread of execution is not the same as that of WSerialT See notes in WSerialT for details about its scheduling behavior.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of WAsyncT runs all iterations fairly concurrently using a round robin scheduling.

main = drain . wAsyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Since: 0.2.0

Instances

Instances details
MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0

wAsyncly :: IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of WAsyncT. Merges two streams concurrently choosing elements from both fairly.

Since: 0.2.0