streamly-0.8.2: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilitypre-release
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream

Description

This is an internal module which is a superset of the corresponding released module Streamly.Prelude. It contains some additional unreleased or experimental APIs.

Synopsis

Documentation

newtype Stream m a Source #

The type Stream m a represents a monadic stream of values of type a constructed using actions in monad m. It uses stop, singleton and yield continuations equivalent to the following direct style type:

data Stream m a = Stop | Singleton a | Yield a (Stream m a)

To facilitate parallel composition we maintain a local state in an SVar that is shared across and is used for synchronization of the streams being composed.

The singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of pure for Applicative and Monad, and in lift for MonadTrans.

Constructors

MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
MonadTrans Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

lift :: Monad m => m a -> Stream m a #

Monad m => Monad (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(>>=) :: Stream m a -> (a -> Stream m b) -> Stream m b #

(>>) :: Stream m a -> Stream m b -> Stream m b #

return :: a -> Stream m a #

Monad m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b #

(<$) :: a -> Stream m b -> Stream m a #

Monad m => Applicative (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

pure :: a -> Stream m a #

(<*>) :: Stream m (a -> b) -> Stream m a -> Stream m b #

liftA2 :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c #

(*>) :: Stream m a -> Stream m b -> Stream m b #

(<*) :: Stream m a -> Stream m b -> Stream m a #

Semigroup (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(<>) :: Stream m a -> Stream m a -> Stream m a #

sconcat :: NonEmpty (Stream m a) -> Stream m a #

stimes :: Integral b => b -> Stream m a -> Stream m a #

Monoid (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

mempty :: Stream m a #

mappend :: Stream m a -> Stream m a -> Stream m a #

mconcat :: [Stream m a] -> Stream m a #

type WSerial = WSerialT IO Source #

An interleaving serial IO stream of elements of type a. See WSerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data WSerialT m a Source #

For WSerialT streams:

(<>) = wSerial                       -- Semigroup
(>>=) = flip . concatMapWith wSerial -- Monad

Note that <> is associative only if we disregard the ordering of elements in the resulting stream.

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like interleaved nested for loops:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(2,3),(1,4),(2,4)]

It is a result of interleaving all the nested iterations corresponding to element 1 in the first stream with all the nested iterations of element 2:

>>> import Streamly.Prelude (wSerial)
>>> Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of SerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadState s m => MonadState s (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

Monad m => Monad (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

Monad m => Functor (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Applicative (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

MonadIO m => MonadIO (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> WSerialT m a #

NFData1 (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

IsList (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (WSerialT Identity a) #

Eq a => Eq (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Monoid (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

NFData a => NFData (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: WSerialT Identity a -> () #

type Item (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a

type Serial = SerialT IO Source #

A serial IO stream of elements of type a. See SerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data SerialT m a Source #

For SerialT streams:

(<>) = serial                       -- Semigroup
(>>=) = flip . concatMapWith serial -- Monad

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
Stream.toList $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadState s m => MonadState s (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

Monad m => Monad (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

Monad m => Functor (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Applicative (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

MonadIO m => MonadIO (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> SerialT m a #

NFData1 (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

IsList (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (SerialT Identity a) #

Eq a => Eq (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

Monoid (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

NFData a => NFData (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: SerialT Identity a -> () #

type Item (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data WAsyncT m a Source #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data AsyncT m a Source #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0 (Streamly)

Since: 0.8.0

data AheadT m a Source #

For AheadT streams:

(<>) = ahead
(>>=) = flip . concatMapWith ahead

A single Monad bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order:

>>> :{
Stream.toList $ Stream.fromAhead $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[2,1]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time:

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,5,4,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.

Since: 0.3.0 (Streamly)

Since: 0.8.0

Instances

Instances details
MonadTrans AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

type Parallel = ParallelT IO Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data ParallelT m a Source #

For ParallelT streams:

(<>) = parallel
(>>=) = flip . concatMapWith parallel

See AsyncT, ParallelT is similar except that all iterations are strictly concurrent while in AsyncT it depends on the consumer demand and available threads. See parallel for more details.

Since: 0.1.0 (Streamly)

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: 0.8.0

Instances

Instances details
MonadTrans ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

type ZipAsync = ZipAsyncM IO Source #

An IO stream whose applicative instance zips streams wAsyncly.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data ZipAsyncM m a Source #

For ZipAsyncM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id

Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:

>>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
...
[(1,1),(1,1),(1,1)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

Monad m => Functor (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

(<$) :: a -> ZipAsyncM m b -> ZipAsyncM m a #

MonadAsync m => Applicative (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

pure :: a -> ZipAsyncM m a #

(<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c #

(*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b #

(<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Monoid (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

mempty :: ZipAsyncM m a #

mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a #

type ZipSerial = ZipSerialM IO Source #

An IO stream whose applicative instance zips streams serially.

Since: 0.2.0 (Streamly)

Since: 0.8.0

data ZipSerialM m a Source #

For ZipSerialM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id

Applicative evaluates the streams being zipped serially:

>>> s1 = Stream.fromFoldable [1, 2]
>>> s2 = Stream.fromFoldable [3, 4]
>>> s3 = Stream.fromFoldable [5, 6]
>>> Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

Monad m => Functor (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

(<$) :: a -> ZipSerialM m b -> ZipSerialM m a #

Monad m => Applicative (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

pure :: a -> ZipSerialM m a #

(<*>) :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

liftA2 :: (a -> b -> c) -> ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m c #

(*>) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m b #

(<*) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m a #

(Foldable m, Monad m) => Foldable (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fold :: Monoid m0 => ZipSerialM m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldr :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldr' :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldl :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldl' :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldr1 :: (a -> a -> a) -> ZipSerialM m a -> a #

foldl1 :: (a -> a -> a) -> ZipSerialM m a -> a #

toList :: ZipSerialM m a -> [a] #

null :: ZipSerialM m a -> Bool #

length :: ZipSerialM m a -> Int #

elem :: Eq a => a -> ZipSerialM m a -> Bool #

maximum :: Ord a => ZipSerialM m a -> a #

minimum :: Ord a => ZipSerialM m a -> a #

sum :: Num a => ZipSerialM m a -> a #

product :: Num a => ZipSerialM m a -> a #

Traversable (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

traverse :: Applicative f => (a -> f b) -> ZipSerialM Identity a -> f (ZipSerialM Identity b) #

sequenceA :: Applicative f => ZipSerialM Identity (f a) -> f (ZipSerialM Identity a) #

mapM :: Monad m => (a -> m b) -> ZipSerialM Identity a -> m (ZipSerialM Identity b) #

sequence :: Monad m => ZipSerialM Identity (m a) -> m (ZipSerialM Identity a) #

NFData1 (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

liftRnf :: (a -> ()) -> ZipSerialM Identity a -> () #

IsList (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Associated Types

type Item (ZipSerialM Identity a) #

Eq a => Eq (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Ord a => Ord (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Read a => Read (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Show a => Show (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

a ~ Char => IsString (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

Monoid (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

mempty :: ZipSerialM m a #

mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

mconcat :: [ZipSerialM m a] -> ZipSerialM m a #

NFData a => NFData (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

rnf :: ZipSerialM Identity a -> () #

type Item (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) = a

type Streaming = IsStream Source #

Deprecated: Please use IsStream instead.

Same as IsStream.

Since: 0.1.0

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Methods

fromStream :: Stream m a -> t m a Source #

consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ fromSerial  $ delay |: delay |: delay |: nil
drain $ fromParallel $ delay |: delay |: delay |: nil

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

Instances

Instances details
IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a Source #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a Source #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a Source #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a Source #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a Source #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

fromStreamS :: (IsStream t, Monad m) => Stream m a -> t m a Source #

toStreamS :: (IsStream t, Monad m) => t m a -> Stream m a Source #

toStreamD :: (IsStream t, Monad m) => t m a -> Stream m a Source #

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList = foldr cons nil

Construct a stream from a list of pure values. This is more efficient than fromFoldable for serial streams.

Since: 0.4.0

foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b Source #

foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #

Like foldlx', but with a monadic step function.

Since: 0.7.0

foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.7.0

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromStreamD :: (IsStream t, Monad m) => Stream m a -> t m a Source #

toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> Stream m a -> Stream m a Source #

Adapt a polymorphic consM operation to a StreamK cons operation

mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Build a stream from an SVar, a stop continuation, a singleton stream continuation and a yield continuation.

foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

fromSerial :: IsStream t => SerialT m a -> t m a Source #

Fix the type of a polymorphic stream as SerialT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromWSerial :: IsStream t => WSerialT m a -> t m a Source #

Fix the type of a polymorphic stream as WSerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

interleaving :: IsStream t => WSerialT m a -> t m a Source #

Deprecated: Please use fromWSerial instead.

Same as fromWSerial.

Since: 0.1.0

fromAsync :: IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromAhead :: IsStream t => AheadT m a -> t m a Source #

Fix the type of a polymorphic stream as AheadT.

Since: 0.3.0 (Streamly)

Since: 0.8.0

fromParallel :: IsStream t => ParallelT m a -> t m a Source #

Fix the type of a polymorphic stream as ParallelT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipSerialM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

zipping :: IsStream t => ZipSerialM m a -> t m a Source #

Deprecated: Please use fromZipSerial instead.

Same as fromZipSerial.

Since: 0.1.0

fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipAsyncM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

Deprecated: Please use fromZipAsync instead.

Same as fromZipAsync.

Since: 0.1.0

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

nil :: IsStream t => t m a Source #

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b Source #

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

concatMapWith mixer generator stream is a two dimensional looping combinator. The generator function is used to generate streams from the elements in the input stream and the mixer function is used to merge those streams.

Note we can merge streams concurrently by using a concurrent merge function.

Since: 0.7.0

Since: 0.8.0 (signature change)

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.

concatMapFoldableWith async return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)

Since: 0.1.0 (Streamly)

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like concatMapFoldableWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs
concatForFoldableWith f = flip (S.concatMapFoldableWith f)

Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)

Since: 0.1.0 (Streamly)

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

concatFoldableWith async $ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f S.nil
concatFoldableWith f = S.concatMapFoldableWith f id

Since: 0.8.0 (Renamed foldWith to concatFoldableWith)

Since: 0.1.0 (Streamly)