streamly-0.8.2: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly

Description

Deprecated: Please use Streamly.Prelude instead.

Streamly is a general purpose programming framework using cocnurrent data flow programming paradigm. It can be considered as a generalization of Haskell lists to monadic streaming with concurrent composition capability. The serial stream type in streamly SerialT m a is like the list type [a] parameterized by the monad m. For example, SerialT IO a is a moral equivalent of [a] in the IO monad. Streams are constructed very much like lists, except that they use nil and cons instead of '[]' and :.

> import Streamly
> import Streamly.Prelude (cons, consM)
> import qualified Streamly.Prelude as S
>
> S.toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Unlike lists, streams can be constructed from monadic effects:

> S.toList $ getLine `consM` getLine `consM` S.nil
hello
world
["hello","world"]

Streams are processed just like lists, with list like combinators, except that they are monadic and work in a streaming fashion. Here is a simple console echo program example:

> S.drain $ S.repeatM getLine & S.mapM putStrLn

SerialT Identity a is a moral equivalent of pure lists. Streamly utilizes fusion for high performance, therefore, we can represent and process strings as streams of Char, encode and decode the streams to/from UTF8 and serialize them to Array Word8 obviating the need for special purpose libraries like bytestring and text.

For more details please see the Streamly.Tutorial module and the examples directory in this package.

Synopsis

Module Overview

The basic stream type is Serial, it represents a sequence of IO actions, and is a Monad. The type SerialT is a monad transformer that can represent a sequence of actions in an arbitrary monad. The type Serial is in fact a synonym for SerialT IO. There are a few more types similar to SerialT, all of them represent a stream and differ only in the Semigroup, Applicative and Monad compositions of the stream. Serial and WSerial types compose serially whereas Async and WAsync types compose concurrently. All these types can be freely inter-converted using type combinators without any cost. You can freely switch to any type of composition at any point in the program. When no type annotation or explicit stream type combinators are used, the default stream type is inferred as Serial.

This module exports stream types, instances and combinators for:

  • converting between different stream types
  • appending and concurrently merging streams
  • Concurrency control
  • Concurrent function application
  • Stream rate control

This module is designed to be imported unqualified:

import Streamly

See the Streamly.Prelude module for APIs for construction, generation, elimination and transformation of streams.

Type Synonyms

type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Stream transformers

A stream represents a sequence of pure or effectful actions. The cons and consM operations and the corresponding operators .: and |: can be used to join pure values or effectful actions in a sequence. The effects in the stream can be executed in many different ways depending on the type of stream. In other words, the behavior of consM depends on the type of the stream.

There are three high level categories of streams, spatially ordered streams, speculative streams and time ordered streams. Spatially ordered streams, SerialT and WSerialT, execute the effects in serial order i.e. one at a time and present the outputs of those effects to the consumer in the same order. Speculative streams, AheadT, may execute many effects concurrently but present the outputs to the consumer in the specified spatial order. Time ordered streams, AsyncT, WAsyncT and ParallelT, may execute many effects concurrently and present the outputs of those effects to the consumer in time order i.e. as soon as the output is generated.

We described above how the effects in a sequence are executed for different types of streams. The behvavior of the Semigroup and Monad instances follow the behavior of consM. Stream generation operations like repeatM also execute the effects differently for different streams, providing a concurrent generation capability when used with stream types that execute effects concurrently. Similarly, effectful transformation operations like mapM also execute the transforming effects differently for different types of streams.

Serial Streams

When a stream consumer demands an element from a serial stream constructed as a `consM` b `consM` ... nil, the action a at the head of the stream sequence is executed and the result is supplied to the consumer. When the next element is demanded, the action b is executed and its result is supplied. Thus, the effects are performed and results are consumed strictly in a serial order. Serial streams can be considered as spatially ordered streams as the order of execution and consumption is the same as the spatial order in which the actions are composed by the programmer.

Serial streams enforce the side effects as well as the results of the actions to be in the same order in which the actions are added to the stream. Therefore, the semigroup operation for serial streams is not commutative:

a <> b is not the same as b <> a

There are two serial stream types SerialT and WSerialT. The stream evaluation of both the variants works in the same way as described above, they differ only in the Semigroup and Monad implementaitons.

data SerialT m a Source #

For SerialT streams:

(<>) = serial                       -- Semigroup
(>>=) = flip . concatMapWith serial -- Monad

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
Stream.toList $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadState s m => MonadState s (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

Monad m => Monad (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

Monad m => Functor (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Applicative (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

MonadIO m => MonadIO (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> SerialT m a #

NFData1 (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

IsList (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (SerialT Identity a) #

Eq a => Eq (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

Monoid (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

NFData a => NFData (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: SerialT Identity a -> () #

type Item (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a

data WSerialT m a Source #

For WSerialT streams:

(<>) = wSerial                       -- Semigroup
(>>=) = flip . concatMapWith wSerial -- Monad

Note that <> is associative only if we disregard the ordering of elements in the resulting stream.

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like interleaved nested for loops:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(2,3),(1,4),(2,4)]

It is a result of interleaving all the nested iterations corresponding to element 1 in the first stream with all the nested iterations of element 2:

>>> import Streamly.Prelude (wSerial)
>>> Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of SerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadState s m => MonadState s (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

Monad m => Monad (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

Monad m => Functor (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Applicative (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

MonadIO m => MonadIO (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> WSerialT m a #

NFData1 (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

IsList (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (WSerialT Identity a) #

Eq a => Eq (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Monoid (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

NFData a => NFData (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: WSerialT Identity a -> () #

type Item (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a

Speculative Streams

When a stream consumer demands an element from a speculative stream constructed as a `consM` b `consM` ... nil, the action a at the head of the stream is executed and the output of the action is supplied to the consumer. However, in addition to the action at the head multiple actions following it may also be executed concurrently and the results buffered. When the next element is demanded it may be served from the buffer and we may execute the next action in the sequence to keep the buffer adequately filled. Thus, the actions are executed concurrently but results consumed in serial order just like serial streams. consM can be used to fold an infinite lazy container of effects, as the number of concurrent executions is limited.

Similar to consM, the monadic stream generation (e.g. replicateM) and transformation operations (e.g. mapM) on speculative streams can execute multiple effects concurrently in a speculative manner.

How many effects can be executed concurrently and how many results can be buffered are controlled by maxThreads and maxBuffer combinators respectively. The actual number of concurrent threads is adjusted according to the rate at which the consumer is consuming the stream. It may even execute actions serially in a single thread if that is enough to match the consumer's speed.

Speculative streams enforce ordering of the results of actions in the stream but the side effects are only partially ordered. Therefore, the semigroup operation for speculative streams is not commutative from the pure outputs perspective but commutative from side effects perspective.

data AheadT m a Source #

For AheadT streams:

(<>) = ahead
(>>=) = flip . concatMapWith ahead

A single Monad bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order:

>>> :{
Stream.toList $ Stream.fromAhead $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[2,1]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time:

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,5,4,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.

Since: 0.3.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

Asynchronous Streams

Scheduling and execution: In an asynchronous stream a `consM` b `consM` c ..., the actions a, b, and c are executed concurrently with the consumer of the stream. The actions are scheduled for execution in the same order as they are specified in the stream. Multiple scheduled actions may be executed concurrently in parallel threads of execution. The actions may be executed out of order and they may complete at arbitrary times. Therefore, the effects of the actions may be observed out of order.

Buffering: The results from multiple threads of execution are queued in a buffer as soon as they become available. The consumer of the stream is served from this buffer. Therefore, the consumer may observe the results to be out of order. In other words, an asynchronous stream is an unordered stream i.e. order does not matter.

Concurrency control: Threads are suspended if the maxBuffer limit is reached, and resumed when the consumer makes space in the buffer. The maximum number of concurrent threads depends on maxThreads. Number of threads is increased or decreased based on the speed of the consumer.

Generation operations: Concurrent stream generation operations e.g. replicateM when used in async style schedule and execute the stream generating actions in the manner described above. The generation actions run concurrently, effects and results of the actions as observed by the consumer of the stream may be out of order.

Transformation operations: Concurrent stream transformation operations e.g. mapM, when used in async style, schedule and execute transformation actions in the manner described above. Transformation actions run concurrently, effects and results of the actions may be observed by the consumer out of order.

Variants: There are two asynchronous stream types AsyncT and WAsyncT. They are identical with respect to single stream evaluation behavior. Their behaviors differ in how they combine multiple streams using Semigroup or Monad composition. Since the order of elements does not matter in asynchronous streams the Semigroup operation is effectively commutative.

data AsyncT m a Source #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

data WAsyncT m a Source #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

data ParallelT m a Source #

For ParallelT streams:

(<>) = parallel
(>>=) = flip . concatMapWith parallel

See AsyncT, ParallelT is similar except that all iterations are strictly concurrent while in AsyncT it depends on the consumer demand and available threads. See parallel for more details.

Since: 0.1.0 (Streamly)

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: 0.8.0

Instances

Instances details
MonadTrans ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

Zipping Streams

ZipSerialM and ZipAsyncM, provide Applicative instances for zipping the corresponding elements of two streams together. Note that these types are not monads.

data ZipSerialM m a Source #

For ZipSerialM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id

Applicative evaluates the streams being zipped serially:

>>> s1 = Stream.fromFoldable [1, 2]
>>> s2 = Stream.fromFoldable [3, 4]
>>> s3 = Stream.fromFoldable [5, 6]
>>> Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

Monad m => Functor (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

(<$) :: a -> ZipSerialM m b -> ZipSerialM m a #

Monad m => Applicative (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

pure :: a -> ZipSerialM m a #

(<*>) :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

liftA2 :: (a -> b -> c) -> ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m c #

(*>) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m b #

(<*) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m a #

(Foldable m, Monad m) => Foldable (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fold :: Monoid m0 => ZipSerialM m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldr :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldr' :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldl :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldl' :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldr1 :: (a -> a -> a) -> ZipSerialM m a -> a #

foldl1 :: (a -> a -> a) -> ZipSerialM m a -> a #

toList :: ZipSerialM m a -> [a] #

null :: ZipSerialM m a -> Bool #

length :: ZipSerialM m a -> Int #

elem :: Eq a => a -> ZipSerialM m a -> Bool #

maximum :: Ord a => ZipSerialM m a -> a #

minimum :: Ord a => ZipSerialM m a -> a #

sum :: Num a => ZipSerialM m a -> a #

product :: Num a => ZipSerialM m a -> a #

Traversable (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

traverse :: Applicative f => (a -> f b) -> ZipSerialM Identity a -> f (ZipSerialM Identity b) #

sequenceA :: Applicative f => ZipSerialM Identity (f a) -> f (ZipSerialM Identity a) #

mapM :: Monad m => (a -> m b) -> ZipSerialM Identity a -> m (ZipSerialM Identity b) #

sequence :: Monad m => ZipSerialM Identity (m a) -> m (ZipSerialM Identity a) #

NFData1 (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

liftRnf :: (a -> ()) -> ZipSerialM Identity a -> () #

IsList (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Associated Types

type Item (ZipSerialM Identity a) #

Eq a => Eq (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Ord a => Ord (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Read a => Read (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Show a => Show (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

a ~ Char => IsString (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

Monoid (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

mempty :: ZipSerialM m a #

mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

mconcat :: [ZipSerialM m a] -> ZipSerialM m a #

NFData a => NFData (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

rnf :: ZipSerialM Identity a -> () #

type Item (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) = a

data ZipAsyncM m a Source #

For ZipAsyncM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id

Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:

>>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
...
[(1,1),(1,1),(1,1)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

Monad m => Functor (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

(<$) :: a -> ZipAsyncM m b -> ZipAsyncM m a #

MonadAsync m => Applicative (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

pure :: a -> ZipAsyncM m a #

(<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c #

(*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b #

(<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Monoid (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

mempty :: ZipAsyncM m a #

mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a #

Parallel Function Application

Stream processing functions can be composed in a chain using function application with or without the $ operator, or with reverse function application operator &. Streamly provides concurrent versions of these operators applying stream processing functions such that each stage of the stream can run in parallel. The operators start with a |; we can read |$ as "parallel dollar" to remember that | comes before $.

Imports for the code snippets below:

 import Streamly
 import qualified Streamly.Prelude as S
 import Control.Concurrent

(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel transform application operator; applies a stream transformation function t m a -> t m b to a stream t m a concurrently; the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the transformation function runs in another thread consuming the input from the buffer. |$ is just like regular function application operator $ except that it is concurrent.

If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can look at it as a transformation that converts a transform function to a buffered concurrent transform function.

The following code prints a value every second even though each stage adds a 1 second delay.

>>> :{
Stream.drain $
   Stream.mapM (\x -> threadDelay 1000000 >> print x)
     |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

Same as |$ but with arguments reversed.

(|&) = flip (|$)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function t m a -> m b to a stream t m a concurrently; The the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the folding action runs in another thread consuming the input from the buffer.

If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look at it as a transformation that converts a fold function to a buffered concurrent fold function.

The . at the end of the operator is a mnemonic for termination of the stream.

In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Prelude ((|$.))
>>> :{
 Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ())
     |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

Same as |$. but with arguments reversed.

(|&.) = flip (|$.)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #

Make a stream asynchronous, triggers the computation and returns a stream in the underlying monad representing the output generated by the original computation. The returned action is exhaustible and must be drained once. If not drained fully we may have a thread blocked forever and once exhausted it will always return empty.

Since: 0.2.0

Merging Streams

The Semigroup operation <> of each stream type combines two streams in a type specific manner. This section provides polymorphic versions of <> which can be used to combine two streams in a predetermined way irrespective of the type.

serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> import Streamly.Prelude (serial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ stream1 `serial` stream2
[1,2,3,4]

This operation can be used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

>>> import Streamly.Prelude (wSerial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2
[1,3,2,4]

Note, for singleton streams wSerial and serial are identical.

Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

>>> import Streamly.Prelude (ahead, SerialT)
>>> stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
>>> stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
>>> Stream.toList $ stream1 `ahead` stream2 :: IO [Int]
2 sec
4 sec
[4,2]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `ahead` stream2 `ahead` stream3
1 sec
2 sec
4 sec
[4,2,1]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3
2 sec
1 sec
4 sec
[4,2,1]

Only streams are scheduled for ahead evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently. It may not make much sense combining serial streams using ahead.

ahead can be safely used to fold an infinite lazy container of streams.

Since: 0.3.0 (Streamly)

Since: 0.8.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

>>> import Streamly.Prelude (async)
>>> stream1 = Stream.fromEffect (delay 4)
>>> stream2 = Stream.fromEffect (delay 2)
>>> Stream.toList $ stream1 `async` stream2
2 sec
4 sec
[2,4]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `async` stream2 `async` stream3
...
[1,2,4]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
...
[2,1,4]

With a single thread, it becomes serial:

>>> Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
...
[4,2,1]

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

>>> stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>> stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>> Stream.toList $ stream1 `async` stream2 -- IO [Int]
...
[1,1,3,3]

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]

... [1,1,3,2,3,2]

Thus async goes deep in first few streams rather than going wide in all streams. It prefers to evaluate the leftmost streams as much as possible. Because of this behavior, async can be safely used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, wAsync is the same as async. See async for singleton stream behavior. For multi-element streams, while async is left biased i.e. it tries to evaluate the left side stream as much as possible, wAsync tries to schedule them both fairly. In other words, async goes deep while wAsync goes wide. However, outputs are always used as they arrive.

With a single thread, async starts behaving like serial while wAsync starts behaving like wSerial.

>>> import Streamly.Prelude (async, wAsync)
>>> stream1 = Stream.fromList [1,2,3]
>>> stream2 = Stream.fromList [4,5,6]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]

With two threads available, and combining three streams:

>>> stream3 = Stream.fromList [7,8,9]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that WSerialT and single threaded WAsyncT both interleave streams but the exact scheduling is slightly different in both cases.

Since: 0.2.0 (Streamly)

Since: 0.8.0

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Like async except that the execution is much more strict. There is no limit on the number of threads. While async may not schedule a stream if there is no demand from the consumer, parallel always evaluates both the streams immediately. The only limit that applies to parallel is maxBuffer. Evaluation may block if the output buffer becomes full.

>>> import Streamly.Prelude (parallel)
>>> stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
>>> Stream.toList stream -- IO [Int]
1 sec
2 sec
[1,2]

parallel guarantees that all the streams are scheduled for execution immediately, therefore, we could use things like starting timers inside the streams and relying on the fact that all timers were started at the same time.

Unlike async this operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams strictly concurrently.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Concurrency Control

These combinators can be used at any point in a stream composition to set parameters to control the concurrency of the argument stream. A control parameter set at any point remains effective for any concurrent combinators used in the argument stream until it is reset by using the combinator again. These control parameters have no effect on non-concurrent combinators in the stream, or on non-concurrent streams.

Pitfall: Remember that maxBuffer in the following example applies to mapM and any other combinators that may follow it, and it does not apply to the combinators before it:

 ...
 $ maxBuffer 10
 $ S.mapM ...
 ...

If we use & instead of $ the situation will reverse, in the following example, maxBuffer does not apply to mapM, it applies to combinators that come before it, because those are the arguments to maxBuffer:

 ...
 & maxBuffer 10
 & S.mapM ...
 ...

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500. maxThreads does not affect ParallelT streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

Since: 0.4.0 (Streamly)

Since: 0.8.0

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Since: 0.4.0 (Streamly)

Since: 0.8.0

Rate Limiting

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0 (Streamly)

Since: 0.8.0

Constructors

Rate 

Fields

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream. A Nothing value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a stream is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Since: 0.5.0 (Streamly)

Since: 0.8.0

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

Since: 0.5.0 (Streamly)

Since: 0.8.0

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

Since: 0.5.0 (Streamly)

Since: 0.8.0

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

Stream Type Adapters

You may want to use different stream composition styles at different points in your program. Stream types can be freely converted or adapted from one type to another. The IsStream type class facilitates type conversion of one stream type to another. It is not used directly, instead the type combinators provided below are used for conversions.

To adapt from one monomorphic type (e.g. AsyncT) to another monomorphic type (e.g. SerialT) use the adapt combinator. To give a polymorphic code a specific interpretation or to adapt a specific type to a polymorphic type use the type specific combinators e.g. fromAsync or fromWSerial. You cannot adapt polymorphic code to polymorphic code, as the compiler would not know which specific type you are converting from or to. If you see a an ambiguous type variable error then most likely you are using adapt unnecessarily on polymorphic code.

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Instances

Instances details
IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

serially :: IsStream t => SerialT m a -> t m a Source #

wSerially :: IsStream t => WSerialT m a -> t m a Source #

asyncly :: IsStream t => AsyncT m a -> t m a Source #

aheadly :: IsStream t => AheadT m a -> t m a Source #

wAsyncly :: IsStream t => WAsyncT m a -> t m a Source #

parallely :: IsStream t => ParallelT m a -> t m a Source #

zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #

zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0 (Streamly)

Since: 0.8.0

IO Streams

type Serial = SerialT IO Source #

A serial IO stream of elements of type a. See SerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type WSerial = WSerialT IO Source #

An interleaving serial IO stream of elements of type a. See WSerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0 (Streamly)

Since: 0.8.0

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type Parallel = ParallelT IO Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type ZipSerial = ZipSerialM IO Source #

An IO stream whose applicative instance zips streams serially.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type ZipAsync = ZipAsyncM IO Source #

An IO stream whose applicative instance zips streams wAsyncly.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Folding Containers of Streams

These are variants of standard Foldable fold functions that use a polymorphic stream sum operation (e.g. async or wSerial) to fold a finite container of streams. Note that these are just special cases of the more general concatMapWith operation.

foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

Same as concatFoldableWith

Since: 0.1.0

foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

Same as concatMapFoldableWith

Since: 0.1.0

forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Same as concatForFoldableWith

Since: 0.1.0

Re-exports

class Semigroup a where #

The class of semigroups (types with an associative binary operation).

Instances should satisfy the following:

Associativity
x <> (y <> z) = (x <> y) <> z

Since: base-4.9.0.0

Minimal complete definition

(<>)

Methods

(<>) :: a -> a -> a infixr 6 #

An associative operation.

>>> [1,2,3] <> [4,5,6]
[1,2,3,4,5,6]

sconcat :: NonEmpty a -> a #

Reduce a non-empty list with <>

The default definition should be sufficient, but this can be overridden for efficiency.

>>> import Data.List.NonEmpty
>>> sconcat $ "Hello" :| [" ", "Haskell", "!"]
"Hello Haskell!"

stimes :: Integral b => b -> a -> a #

Repeat a value n times.

Given that this works on a Semigroup it is allowed to fail if you request 0 or fewer repetitions, and the default definition will do so.

By making this a member of the class, idempotent semigroups and monoids can upgrade this to execute in \(\mathcal{O}(1)\) by picking stimes = stimesIdempotent or stimes = stimesIdempotentMonoid respectively.

>>> stimes 4 [1]
[1,1,1,1]

Instances

Instances details
Semigroup Ordering

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Semigroup ()

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: () -> () -> () #

sconcat :: NonEmpty () -> () #

stimes :: Integral b => b -> () -> () #

Semigroup Void

Since: base-4.9.0.0

Instance details

Defined in Data.Void

Methods

(<>) :: Void -> Void -> Void #

sconcat :: NonEmpty Void -> Void #

stimes :: Integral b => b -> Void -> Void #

Semigroup All

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: All -> All -> All #

sconcat :: NonEmpty All -> All #

stimes :: Integral b => b -> All -> All #

Semigroup Any

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Any -> Any -> Any #

sconcat :: NonEmpty Any -> Any #

stimes :: Integral b => b -> Any -> Any #

Semigroup IntSet

Since: containers-0.5.7

Instance details

Defined in Data.IntSet.Internal

Semigroup ByteArray 
Instance details

Defined in Data.Primitive.ByteArray

Semigroup [a]

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: [a] -> [a] -> [a] #

sconcat :: NonEmpty [a] -> [a] #

stimes :: Integral b => b -> [a] -> [a] #

Semigroup a => Semigroup (Maybe a)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: Maybe a -> Maybe a -> Maybe a #

sconcat :: NonEmpty (Maybe a) -> Maybe a #

stimes :: Integral b => b -> Maybe a -> Maybe a #

Semigroup a => Semigroup (IO a)

Since: base-4.10.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: IO a -> IO a -> IO a #

sconcat :: NonEmpty (IO a) -> IO a #

stimes :: Integral b => b -> IO a -> IO a #

Semigroup p => Semigroup (Par1 p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: Par1 p -> Par1 p -> Par1 p #

sconcat :: NonEmpty (Par1 p) -> Par1 p #

stimes :: Integral b => b -> Par1 p -> Par1 p #

Ord a => Semigroup (Min a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Min a -> Min a -> Min a #

sconcat :: NonEmpty (Min a) -> Min a #

stimes :: Integral b => b -> Min a -> Min a #

Ord a => Semigroup (Max a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Max a -> Max a -> Max a #

sconcat :: NonEmpty (Max a) -> Max a #

stimes :: Integral b => b -> Max a -> Max a #

Semigroup (First a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: First a -> First a -> First a #

sconcat :: NonEmpty (First a) -> First a #

stimes :: Integral b => b -> First a -> First a #

Semigroup (Last a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Last a -> Last a -> Last a #

sconcat :: NonEmpty (Last a) -> Last a #

stimes :: Integral b => b -> Last a -> Last a #

Monoid m => Semigroup (WrappedMonoid m)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Semigroup a => Semigroup (Option a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Option a -> Option a -> Option a #

sconcat :: NonEmpty (Option a) -> Option a #

stimes :: Integral b => b -> Option a -> Option a #

Semigroup a => Semigroup (Identity a)

Since: base-4.9.0.0

Instance details

Defined in Data.Functor.Identity

Methods

(<>) :: Identity a -> Identity a -> Identity a #

sconcat :: NonEmpty (Identity a) -> Identity a #

stimes :: Integral b => b -> Identity a -> Identity a #

Semigroup (First a)

Since: base-4.9.0.0

Instance details

Defined in Data.Monoid

Methods

(<>) :: First a -> First a -> First a #

sconcat :: NonEmpty (First a) -> First a #

stimes :: Integral b => b -> First a -> First a #

Semigroup (Last a)

Since: base-4.9.0.0

Instance details

Defined in Data.Monoid

Methods

(<>) :: Last a -> Last a -> Last a #

sconcat :: NonEmpty (Last a) -> Last a #

stimes :: Integral b => b -> Last a -> Last a #

Semigroup a => Semigroup (Dual a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Dual a -> Dual a -> Dual a #

sconcat :: NonEmpty (Dual a) -> Dual a #

stimes :: Integral b => b -> Dual a -> Dual a #

Semigroup (Endo a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Endo a -> Endo a -> Endo a #

sconcat :: NonEmpty (Endo a) -> Endo a #

stimes :: Integral b => b -> Endo a -> Endo a #

Num a => Semigroup (Sum a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Sum a -> Sum a -> Sum a #

sconcat :: NonEmpty (Sum a) -> Sum a #

stimes :: Integral b => b -> Sum a -> Sum a #

Num a => Semigroup (Product a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Product a -> Product a -> Product a #

sconcat :: NonEmpty (Product a) -> Product a #

stimes :: Integral b => b -> Product a -> Product a #

Semigroup a => Semigroup (Down a)

Since: base-4.11.0.0

Instance details

Defined in Data.Ord

Methods

(<>) :: Down a -> Down a -> Down a #

sconcat :: NonEmpty (Down a) -> Down a #

stimes :: Integral b => b -> Down a -> Down a #

Semigroup (NonEmpty a)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: NonEmpty a -> NonEmpty a -> NonEmpty a #

sconcat :: NonEmpty (NonEmpty a) -> NonEmpty a #

stimes :: Integral b => b -> NonEmpty a -> NonEmpty a #

Semigroup (IntMap a)

Since: containers-0.5.7

Instance details

Defined in Data.IntMap.Internal

Methods

(<>) :: IntMap a -> IntMap a -> IntMap a #

sconcat :: NonEmpty (IntMap a) -> IntMap a #

stimes :: Integral b => b -> IntMap a -> IntMap a #

Ord a => Semigroup (Set a)

Since: containers-0.5.7

Instance details

Defined in Data.Set.Internal

Methods

(<>) :: Set a -> Set a -> Set a #

sconcat :: NonEmpty (Set a) -> Set a #

stimes :: Integral b => b -> Set a -> Set a #

Semigroup (Heap a) 
Instance details

Defined in Data.Heap

Methods

(<>) :: Heap a -> Heap a -> Heap a #

sconcat :: NonEmpty (Heap a) -> Heap a #

stimes :: Integral b => b -> Heap a -> Heap a #

Semigroup (Array a)

Since: primitive-0.6.3.0

Instance details

Defined in Data.Primitive.Array

Methods

(<>) :: Array a -> Array a -> Array a #

sconcat :: NonEmpty (Array a) -> Array a #

stimes :: Integral b => b -> Array a -> Array a #

Semigroup (SmallArray a) Source #

Since: 0.6.3.0

Instance details

Defined in Streamly.Internal.Data.SmallArray.Type

Semigroup (MergeSet a) 
Instance details

Defined in Data.Set.Internal

Methods

(<>) :: MergeSet a -> MergeSet a -> MergeSet a #

sconcat :: NonEmpty (MergeSet a) -> MergeSet a #

stimes :: Integral b => b -> MergeSet a -> MergeSet a #

Storable a => Semigroup (Array a) Source # 
Instance details

Defined in Streamly.Internal.Data.Array.Foreign.Type

Methods

(<>) :: Array a -> Array a -> Array a #

sconcat :: NonEmpty (Array a) -> Array a #

stimes :: Integral b => b -> Array a -> Array a #

Semigroup (ZipList a) Source # 
Instance details

Defined in Streamly.Internal.Data.List

Methods

(<>) :: ZipList a -> ZipList a -> ZipList a #

sconcat :: NonEmpty (ZipList a) -> ZipList a #

stimes :: Integral b => b -> ZipList a -> ZipList a #

Semigroup (List a) Source # 
Instance details

Defined in Streamly.Internal.Data.List

Methods

(<>) :: List a -> List a -> List a #

sconcat :: NonEmpty (List a) -> List a #

stimes :: Integral b => b -> List a -> List a #

Prim a => Semigroup (Array a) Source # 
Instance details

Defined in Streamly.Internal.Data.Array.Prim.Pinned.Type

Methods

(<>) :: Array a -> Array a -> Array a #

sconcat :: NonEmpty (Array a) -> Array a #

stimes :: Integral b => b -> Array a -> Array a #

Prim a => Semigroup (Array a) Source # 
Instance details

Defined in Streamly.Internal.Data.Array.Prim.Type

Methods

(<>) :: Array a -> Array a -> Array a #

sconcat :: NonEmpty (Array a) -> Array a #

stimes :: Integral b => b -> Array a -> Array a #

Semigroup b => Semigroup (a -> b)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a -> b) -> (a -> b) -> a -> b #

sconcat :: NonEmpty (a -> b) -> a -> b #

stimes :: Integral b0 => b0 -> (a -> b) -> a -> b #

Semigroup (Either a b)

Since: base-4.9.0.0

Instance details

Defined in Data.Either

Methods

(<>) :: Either a b -> Either a b -> Either a b #

sconcat :: NonEmpty (Either a b) -> Either a b #

stimes :: Integral b0 => b0 -> Either a b -> Either a b #

Semigroup (V1 p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: V1 p -> V1 p -> V1 p #

sconcat :: NonEmpty (V1 p) -> V1 p #

stimes :: Integral b => b -> V1 p -> V1 p #

Semigroup (U1 p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: U1 p -> U1 p -> U1 p #

sconcat :: NonEmpty (U1 p) -> U1 p #

stimes :: Integral b => b -> U1 p -> U1 p #

(Semigroup a, Semigroup b) => Semigroup (a, b)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b) -> (a, b) -> (a, b) #

sconcat :: NonEmpty (a, b) -> (a, b) #

stimes :: Integral b0 => b0 -> (a, b) -> (a, b) #

Semigroup a => Semigroup (ST s a)

Since: base-4.11.0.0

Instance details

Defined in GHC.ST

Methods

(<>) :: ST s a -> ST s a -> ST s a #

sconcat :: NonEmpty (ST s a) -> ST s a #

stimes :: Integral b => b -> ST s a -> ST s a #

Semigroup (Proxy s)

Since: base-4.9.0.0

Instance details

Defined in Data.Proxy

Methods

(<>) :: Proxy s -> Proxy s -> Proxy s #

sconcat :: NonEmpty (Proxy s) -> Proxy s #

stimes :: Integral b => b -> Proxy s -> Proxy s #

Ord k => Semigroup (Map k v) 
Instance details

Defined in Data.Map.Internal

Methods

(<>) :: Map k v -> Map k v -> Map k v #

sconcat :: NonEmpty (Map k v) -> Map k v #

stimes :: Integral b => b -> Map k v -> Map k v #

Semigroup (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(<>) :: Stream m a -> Stream m a -> Stream m a #

sconcat :: NonEmpty (Stream m a) -> Stream m a #

stimes :: Integral b => b -> Stream m a -> Stream m a #

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

Semigroup (f p) => Semigroup (Rec1 f p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: Rec1 f p -> Rec1 f p -> Rec1 f p #

sconcat :: NonEmpty (Rec1 f p) -> Rec1 f p #

stimes :: Integral b => b -> Rec1 f p -> Rec1 f p #

(Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b, c) -> (a, b, c) -> (a, b, c) #

sconcat :: NonEmpty (a, b, c) -> (a, b, c) #

stimes :: Integral b0 => b0 -> (a, b, c) -> (a, b, c) #

Semigroup a => Semigroup (Const a b)

Since: base-4.9.0.0

Instance details

Defined in Data.Functor.Const

Methods

(<>) :: Const a b -> Const a b -> Const a b #

sconcat :: NonEmpty (Const a b) -> Const a b #

stimes :: Integral b0 => b0 -> Const a b -> Const a b #

(Applicative f, Semigroup a) => Semigroup (Ap f a)

Since: base-4.12.0.0

Instance details

Defined in Data.Monoid

Methods

(<>) :: Ap f a -> Ap f a -> Ap f a #

sconcat :: NonEmpty (Ap f a) -> Ap f a #

stimes :: Integral b => b -> Ap f a -> Ap f a #

Alternative f => Semigroup (Alt f a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Alt f a -> Alt f a -> Alt f a #

sconcat :: NonEmpty (Alt f a) -> Alt f a #

stimes :: Integral b => b -> Alt f a -> Alt f a #

Monad m => Semigroup (Pipe m a b) Source # 
Instance details

Defined in Streamly.Internal.Data.Pipe.Type

Methods

(<>) :: Pipe m a b -> Pipe m a b -> Pipe m a b #

sconcat :: NonEmpty (Pipe m a b) -> Pipe m a b #

stimes :: Integral b0 => b0 -> Pipe m a b -> Pipe m a b #

(Semigroup b, Monad m) => Semigroup (Tee m a b) Source #

<> distributes the input to both the argument Tees and combines their outputs using the Semigroup instance of the output type.

Instance details

Defined in Streamly.Internal.Data.Fold.Tee

Methods

(<>) :: Tee m a b -> Tee m a b -> Tee m a b #

sconcat :: NonEmpty (Tee m a b) -> Tee m a b #

stimes :: Integral b0 => b0 -> Tee m a b -> Tee m a b #

Semigroup c => Semigroup (K1 i c p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: K1 i c p -> K1 i c p -> K1 i c p #

sconcat :: NonEmpty (K1 i c p) -> K1 i c p #

stimes :: Integral b => b -> K1 i c p -> K1 i c p #

(Semigroup (f p), Semigroup (g p)) => Semigroup ((f :*: g) p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: (f :*: g) p -> (f :*: g) p -> (f :*: g) p #

sconcat :: NonEmpty ((f :*: g) p) -> (f :*: g) p #

stimes :: Integral b => b -> (f :*: g) p -> (f :*: g) p #

(Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b, c, d) -> (a, b, c, d) -> (a, b, c, d) #

sconcat :: NonEmpty (a, b, c, d) -> (a, b, c, d) #

stimes :: Integral b0 => b0 -> (a, b, c, d) -> (a, b, c, d) #

Semigroup (f p) => Semigroup (M1 i c f p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: M1 i c f p -> M1 i c f p -> M1 i c f p #

sconcat :: NonEmpty (M1 i c f p) -> M1 i c f p #

stimes :: Integral b => b -> M1 i c f p -> M1 i c f p #

Semigroup (f (g p)) => Semigroup ((f :.: g) p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: (f :.: g) p -> (f :.: g) p -> (f :.: g) p #

sconcat :: NonEmpty ((f :.: g) p) -> (f :.: g) p #

stimes :: Integral b => b -> (f :.: g) p -> (f :.: g) p #

(Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b, c, d, e) -> (a, b, c, d, e) -> (a, b, c, d, e) #

sconcat :: NonEmpty (a, b, c, d, e) -> (a, b, c, d, e) #

stimes :: Integral b0 => b0 -> (a, b, c, d, e) -> (a, b, c, d, e) #

Deprecated

type Streaming = IsStream Source #

Deprecated: Please use IsStream instead.

Same as IsStream.

Since: 0.1.0

runStream :: Monad m => SerialT m a -> m () Source #

Same as "Streamly.Prelude.runStream".

runStreaming :: (Monad m, IsStream t) => t m a -> m () Source #

Same as runStream

Since: 0.1.0

runStreamT :: Monad m => SerialT m a -> m () Source #

Same as runStream.

Since: 0.1.0

runInterleavedT :: Monad m => WSerialT m a -> m () Source #

Same as drain . fromWSerial.

Since: 0.1.0

runAsyncT :: Monad m => AsyncT m a -> m () Source #

Same as drain . fromAsync.

Since: 0.1.0

runParallelT :: Monad m => ParallelT m a -> m () Source #

Same as drain . fromParallel.

Since: 0.1.0

runZipStream :: Monad m => ZipSerialM m a -> m () Source #

Same as drain . zipping.

Since: 0.1.0

runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #

Same as drain . zippingAsync.

Since: 0.1.0

type StreamT = SerialT Source #

Deprecated: Please use SerialT instead.

Since: 0.1.0

type InterleavedT = WSerialT Source #

Deprecated: Please use WSerialT instead.

Since: 0.1.0

type ZipStream = ZipSerialM Source #

Deprecated: Please use ZipSerialM instead.

Since: 0.1.0

interleaving :: IsStream t => WSerialT m a -> t m a Source #

Deprecated: Please use fromWSerial instead.

Same as fromWSerial.

Since: 0.1.0

zipping :: IsStream t => ZipSerialM m a -> t m a Source #

Deprecated: Please use fromZipSerial instead.

Same as fromZipSerial.

Since: 0.1.0

zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

Deprecated: Please use fromZipAsync instead.

Same as fromZipAsync.

Since: 0.1.0

(<=>) :: IsStream t => t m a -> t m a -> t m a infixr 5 Source #

Deprecated: Please use wSerial instead.

Same as wSerial.

Since: 0.1.0

(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Deprecated: Please use async instead.

Same as async.

Since: 0.1.0