streamly-0.8.2: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.StreamK.Type

Description

Continuation passing style (CPS) stream implementation. The symbol K below denotes a function as well as a Kontinuation.

Synopsis

The stream type

newtype Stream m a Source #

The type Stream m a represents a monadic stream of values of type a constructed using actions in monad m. It uses stop, singleton and yield continuations equivalent to the following direct style type:

data Stream m a = Stop | Singleton a | Yield a (Stream m a)

To facilitate parallel composition we maintain a local state in an SVar that is shared across and is used for synchronization of the streams being composed.

The singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of pure for Applicative and Monad, and in lift for MonadTrans.

Constructors

MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
MonadTrans Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

lift :: Monad m => m a -> Stream m a #

Monad m => Monad (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(>>=) :: Stream m a -> (a -> Stream m b) -> Stream m b #

(>>) :: Stream m a -> Stream m b -> Stream m b #

return :: a -> Stream m a #

Monad m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b #

(<$) :: a -> Stream m b -> Stream m a #

Monad m => Applicative (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

pure :: a -> Stream m a #

(<*>) :: Stream m (a -> b) -> Stream m a -> Stream m b #

liftA2 :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c #

(*>) :: Stream m a -> Stream m b -> Stream m b #

(<*) :: Stream m a -> Stream m b -> Stream m a #

Semigroup (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

(<>) :: Stream m a -> Stream m a -> Stream m a #

sconcat :: NonEmpty (Stream m a) -> Stream m a #

stimes :: Integral b => b -> Stream m a -> Stream m a #

Monoid (Stream m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

mempty :: Stream m a #

mappend :: Stream m a -> Stream m a -> Stream m a #

mconcat :: [Stream m a] -> Stream m a #

foldr/build

mkStream :: (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) -> Stream m a Source #

foldStream :: State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> Stream m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldStreamShared :: State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> Stream m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

foldrM :: (a -> m b -> m b) -> m b -> Stream m a -> m b Source #

Lazy right fold with a monadic step function.

foldrS :: (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

Lazy right associative fold to a stream.

foldrSShared :: (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

Fold sharing the SVar state within the reconstructed stream

foldrSM :: Monad m => (m a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

build :: forall m a. (forall b. (a -> b -> b) -> b -> b) -> Stream m a Source #

buildS :: ((a -> Stream m a -> Stream m a) -> Stream m a -> Stream m a) -> Stream m a Source #

buildM :: Monad m => (forall r. (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) -> Stream m a Source #

buildSM :: Monad m => ((m a -> Stream m a -> Stream m a) -> Stream m a -> Stream m a) -> Stream m a Source #

augmentS :: ((a -> Stream m a -> Stream m a) -> Stream m a -> Stream m a) -> Stream m a -> Stream m a Source #

augmentSM :: Monad m => ((m a -> Stream m a -> Stream m a) -> Stream m a -> Stream m a) -> Stream m a -> Stream m a Source #

Construction

fromStopK :: StopK m -> Stream m a Source #

Make an empty stream from a stop function.

fromYieldK :: YieldK m a -> Stream m a Source #

Make a singleton stream from a callback function. The callback function calls the one-shot yield continuation to yield an element.

consK :: YieldK m a -> Stream m a -> Stream m a Source #

Add a yield function at the head of the stream.

cons :: a -> Stream m a -> Stream m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: a -> Stream m a -> Stream m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

consM :: Monad m => m a -> Stream m a -> Stream m a infixr 5 Source #

consMBy :: Monad m => (Stream m a -> Stream m a -> Stream m a) -> m a -> Stream m a -> Stream m a Source #

nil :: Stream m a Source #

An empty stream.

> toList nil
[]

Since: 0.1.0

nilM :: Applicative m => m b -> Stream m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil"))
"nil"
[]

Pre-release

Generation

fromEffect :: Monad m => m a -> Stream m a Source #

fromPure :: a -> Stream m a Source #

unfoldr :: (b -> Maybe (a, b)) -> b -> Stream m a Source #

unfoldrMWith :: Monad m => (m a -> Stream m a -> Stream m a) -> (b -> m (Maybe (a, b))) -> b -> Stream m a Source #

repeat :: a -> Stream m a Source #

Generate an infinite stream by repeating a pure value.

Pre-release

repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a Source #

Like repeatM but takes a stream cons operation to combine the actions in a stream specific manner. A serial cons would repeat the values serially while an async cons would repeat concurrently.

Pre-release

replicateMWith :: (m a -> Stream m a -> Stream m a) -> Int -> m a -> Stream m a Source #

fromIndicesMWith :: (m a -> Stream m a -> Stream m a) -> (Int -> m a) -> Stream m a Source #

iterateMWith :: Monad m => (m a -> Stream m a -> Stream m a) -> (a -> m a) -> m a -> Stream m a Source #

fromFoldable :: Foldable f => f a -> Stream m a Source #

fromFoldable = foldr cons nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromFoldableM :: (Foldable f, Monad m) => f (m a) -> Stream m a Source #

mfix :: Monad m => (m a -> Stream m a) -> Stream m a Source #

Elimination

uncons :: Applicative m => Stream m a -> m (Maybe (a, Stream m a)) Source #

foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b Source #

Strict left associative fold.

foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Note that the accumulator is always evaluated including the initial value.

drain :: Monad m => Stream m a -> m () Source #

drain = foldl' (\_ _ -> ()) ()
drain = mapM_ (\_ -> return ())

null :: Monad m => Stream m a -> m Bool Source #

tail :: Applicative m => Stream m a -> m (Maybe (Stream m a)) Source #

init :: Applicative m => Stream m a -> m (Maybe (Stream m a)) Source #

Transformation

conjoin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

serial :: Stream m a -> Stream m a -> Stream m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

map :: (a -> b) -> Stream m a -> Stream m b Source #

mapMWith :: (m b -> Stream m b -> Stream m b) -> (a -> m b) -> Stream m a -> Stream m b Source #

mapMSerial :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #

unShare :: Stream m a -> Stream m a Source #

Detach a stream from an SVar

concatMapWith :: (Stream m b -> Stream m b -> Stream m b) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function. For example, the concat function could be serial, parallel, async, ahead or any other zip or merge function.

Since: 0.7.0

concatMap :: (a -> Stream m b) -> Stream m a -> Stream m b Source #

bindWith :: (Stream m b -> Stream m b -> Stream m b) -> Stream m a -> (a -> Stream m b) -> Stream m b Source #

concatPairsWith :: (Stream m b -> Stream m b -> Stream m b) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #

See concatPairsWith for documentation.

apWith :: (Stream m b -> Stream m b -> Stream m b) -> Stream m (a -> b) -> Stream m a -> Stream m b Source #

apSerial :: Stream m (a -> b) -> Stream m a -> Stream m b Source #

foldlS :: (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

Lazy left fold to a stream.

reverse :: Stream m a -> Stream m a Source #

Reader

withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a Source #