streamly-0.7.3: Beautiful Streaming, Concurrent and Reactive Composition
Copyright(c) 2018 Harendra Kumar
(c) Roman Leshchinskiy 2008-2010
(c) The University of Glasgow 2009
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.StreamD

Description

Direct style re-implementation of CPS style stream in StreamK module. The symbol or suffix D in this module denotes the Direct style. GHC is able to INLINE and fuse direct style better, providing better performance than CPS implementation.

import qualified Streamly.Internal.Data.Stream.StreamD as D
Synopsis

The stream type

data Step s a Source #

A stream is a succession of Steps. A Yield produces a single value and the next state of the stream. Stop indicates there are no more values in the stream.

Constructors

Yield a s 
Skip s 
Stop 

Instances

Instances details
Functor (Step s) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamD.Type

Methods

fmap :: (a -> b) -> Step s a -> Step s b #

(<$) :: a -> Step s b -> Step s a #

data Stream m a Source #

A stream consists of a step function that generates the next step given a current state, and the current state.

Constructors

forall s. UnStream (State Stream m a -> s -> m (Step s a)) s 

Bundled Patterns

pattern Stream :: (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a 

Instances

Instances details
MonadTrans Stream Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamD.Type

Methods

lift :: Monad m => m a -> Stream m a #

Monad m => Monad (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamD.Type

Methods

(>>=) :: Stream m a -> (a -> Stream m b) -> Stream m b #

(>>) :: Stream m a -> Stream m b -> Stream m b #

return :: a -> Stream m a #

Functor m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamD.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b #

(<$) :: a -> Stream m b -> Stream m a #

Applicative f => Applicative (Stream f) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamD.Type

Methods

pure :: a -> Stream f a #

(<*>) :: Stream f (a -> b) -> Stream f a -> Stream f b #

liftA2 :: (a -> b -> c) -> Stream f a -> Stream f b -> Stream f c #

(*>) :: Stream f a -> Stream f b -> Stream f b #

(<*) :: Stream f a -> Stream f b -> Stream f a #

MonadThrow m => MonadThrow (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamD.Type

Methods

throwM :: Exception e => e -> Stream m a #

Construction

nil :: Monad m => Stream m a Source #

An empty Stream.

nilM :: Monad m => m b -> Stream m a Source #

An empty Stream with a side effect.

cons :: Monad m => a -> Stream m a -> Stream m a Source #

Can fuse but has O(n^2) complexity.

Deconstruction

uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #

Generation

Unfolds

unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a Source #

unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a Source #

unfold :: Monad m => Unfold m a b -> a -> Stream m b Source #

Convert an Unfold into a Stream by supplying it a seed.

Specialized Generation

Generate a monadic stream from a seed.

repeat :: Monad m => a -> Stream m a Source #

repeatM :: Monad m => m a -> Stream m a Source #

replicate :: Monad m => Int -> a -> Stream m a Source #

replicateM :: forall m a. Monad m => Int -> m a -> Stream m a Source #

fromIndices :: Monad m => (Int -> a) -> Stream m a Source #

fromIndicesM :: Monad m => (Int -> m a) -> Stream m a Source #

generate :: Monad m => Int -> (Int -> a) -> Stream m a Source #

generateM :: Monad m => Int -> (Int -> m a) -> Stream m a Source #

iterate :: Monad m => (a -> a) -> a -> Stream m a Source #

iterateM :: Monad m => (a -> m a) -> m a -> Stream m a Source #

Enumerations

enumerateFromStepIntegral :: (Integral a, Monad m) => a -> a -> Stream m a Source #

Can be used to enumerate unbounded integrals. This does not check for overflow or underflow for bounded integrals.

enumerateFromThenToIntegral :: (Monad m, Integral a) => a -> a -> a -> Stream m a Source #

enumerateFromStepNum :: (Monad m, Num a) => a -> a -> Stream m a Source #

numFrom :: (Monad m, Num a) => a -> Stream m a Source #

numFromThen :: (Monad m, Num a) => a -> a -> Stream m a Source #

enumerateFromThenToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> a -> Stream m a Source #

Time

Conversions

Transform an input structure into a stream. | Direct style stream does not support fromFoldable.

yield :: Applicative m => a -> Stream m a Source #

Create a singleton Stream from a pure value.

yieldM :: Monad m => m a -> Stream m a Source #

Create a singleton Stream from a monadic action.

fromList :: Applicative m => [a] -> Stream m a Source #

Convert a list of pure values to a Stream

fromListM :: MonadAsync m => [m a] -> Stream m a Source #

Convert a list of monadic actions to a Stream

fromStreamK :: Monad m => Stream m a -> Stream m a Source #

fromStreamD :: (IsStream t, Monad m) => Stream m a -> t m a Source #

fromPrimVar :: (MonadIO m, Prim a) => Var IO a -> Stream m a Source #

fromSVar :: MonadAsync m => SVar t m a -> Stream m a Source #

Elimination

General Folds

foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

foldrT :: (Monad m, Monad (t m), MonadTrans t) => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b Source #

foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #

foldrMx :: Monad m => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b Source #

foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #

foldr1 :: Monad m => (a -> a -> a) -> Stream m a -> m (Maybe a) Source #

foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b Source #

foldlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> m b Source #

foldlS :: Monad m => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #

foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b Source #

reverse :: Monad m => Stream m a -> Stream m a Source #

reverse' :: forall m a. (MonadIO m, Storable a) => Stream m a -> Stream m a Source #

foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b Source #

foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b Source #

runFold :: Monad m => Fold m a b -> Stream m a -> m b Source #

parselMx' :: MonadThrow m => (s -> a -> m (Step s b)) -> m s -> (s -> m b) -> Stream m a -> m b Source #

Run a Parse over a stream.

splitParse :: MonadThrow m => Parser m a b -> Stream m a -> Stream m b Source #

Specialized Folds

tap :: Monad m => Fold m a b -> Stream m a -> Stream m a Source #

tapOffsetEvery :: Monad m => Int -> Int -> Fold m a b -> Stream m a -> Stream m a Source #

tapAsync :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a Source #

tapRate :: (MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> Stream m a -> Stream m a Source #

pollCounts :: MonadAsync m => (a -> Bool) -> (Stream m Int -> Stream m Int) -> Fold m Int b -> Stream m a -> Stream m a Source #

drain :: Monad m => Stream m a -> m () Source #

Run a streaming composition, discard the results.

null :: Monad m => Stream m a -> m Bool Source #

head :: Monad m => Stream m a -> m (Maybe a) Source #

headElse :: Monad m => a -> Stream m a -> m a Source #

tail :: Monad m => Stream m a -> m (Maybe (Stream m a)) Source #

last :: Monad m => Stream m a -> m (Maybe a) Source #

elem :: (Monad m, Eq a) => a -> Stream m a -> m Bool Source #

notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool Source #

all :: Monad m => (a -> Bool) -> Stream m a -> m Bool Source #

any :: Monad m => (a -> Bool) -> Stream m a -> m Bool Source #

maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a) Source #

maximumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a) Source #

minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a) Source #

minimumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a) Source #

findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int Source #

lookup :: (Monad m, Eq a) => a -> Stream m (a, b) -> m (Maybe b) Source #

findM :: Monad m => (a -> m Bool) -> Stream m a -> m (Maybe a) Source #

find :: Monad m => (a -> Bool) -> Stream m a -> m (Maybe a) Source #

(!!) :: Monad m => Stream m a -> Int -> m (Maybe a) Source #

toSVarParallel :: MonadAsync m => State t m a -> SVar t m a -> Stream m a -> m () Source #

Fold the supplied stream to the SVar asynchronously using Parallel concurrency style. {--}

Flattening nested streams

concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #

concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #

concatMapU :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

concatMapU unfold stream uses unfold to map the input stream elements to streams and then flattens the generated streams into a single output stream.

data AppendState s1 s2 Source #

Constructors

AppendFirst s1 
AppendSecond s2 

append :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

interleave :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

interleaveMin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

interleaveSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

interleaveInfix :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

gintercalateSuffix :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

Interleave streams (full streams, not the elements) unfolded from two input streams and concat. Stop when the first stream stops. If the second stream ends before the first one then first stream still keeps running alone without any interleaving with the second stream.

a1, a2, ... an
[b1, b2 ...] => [streamA1, streamA2, ... streamAn] [streamB1, streamB2, ...] => [streamA1, streamB1, streamA2...StreamAn, streamBn] => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...]

interposeSuffix :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

gintercalate :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #

Interleave streams (full streams, not the elements) unfolded from two input streams and concat. Stop when the first stream stops. If the second stream ends before the first one then first stream still keeps running alone without any interleaving with the second stream.

a1, a2, ... an
[b1, b2 ...] => [streamA1, streamA2, ... streamAn] [streamB1, streamB2, ...] => [streamA1, streamB1, streamA2...StreamAn, streamBn] => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...]

interpose :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c Source #

Grouping

groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b Source #

groupsOf2 :: Monad m => Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b Source #

groupsBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

groupsRollingBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Splitting

splitBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

splitSuffixBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

splitSuffixBy' :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

splitOn :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #

splitSuffixOn :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a) => Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b Source #

splitInnerBy :: Monad m => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

splitInnerBySuffix :: (Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #

Performs infix separator style splitting.

Substreams

isPrefixOf :: (Eq a, Monad m) => Stream m a -> Stream m a -> m Bool Source #

isSubsequenceOf :: (Eq a, Monad m) => Stream m a -> Stream m a -> m Bool Source #

stripPrefix :: (Eq a, Monad m) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

Map and Fold

mapM_ :: Monad m => (a -> m b) -> Stream m a -> m () Source #

Execute a monadic action for each element of the Stream

Conversions

Transform a stream into another type.

toList :: Monad m => Stream m a -> m [a] Source #

toListRev :: Monad m => Stream m a -> m [a] Source #

toStreamK :: Monad m => Stream m a -> Stream m a Source #

toStreamD :: (IsStream t, Monad m) => t m a -> Stream m a Source #

hoist :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a Source #

liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a Source #

runReaderT :: Monad m => s -> Stream (ReaderT s m) a -> Stream m a Source #

evalStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m a Source #

runStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m (s, a) Source #

Transformation

transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b Source #

By folding (scans)

scanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b Source #

scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

scanlM :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b Source #

scanl :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a Source #

scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a Source #

scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a Source #

scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a Source #

prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #

prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #

postscanl :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a Source #

postscanlM :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b Source #

postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a Source #

postscanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b Source #

postscanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b Source #

postscanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b Source #

scanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b Source #

scanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b Source #

Filtering

filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

uniq :: (Eq a, Monad m) => Stream m a -> Stream m a Source #

take :: Monad m => Int -> Stream m a -> Stream m a Source #

takeByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a Source #

takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

drop :: Monad m => Int -> Stream m a -> Stream m a Source #

dropByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a Source #

dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Mapping

map :: Monad m => (a -> b) -> Stream m a -> Stream m b Source #

mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #

Map a monadic function over a Stream

sequence :: Monad m => Stream m (m a) -> Stream m a Source #

rollingMap :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b Source #

rollingMapM :: Monad m => (a -> a -> m b) -> Stream m a -> Stream m b Source #

Inserting

intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #

intersperse :: Monad m => a -> Stream m a -> Stream m a Source #

intersperseSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a Source #

intersperseSuffixBySpan :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a Source #

intersperse after every n items

insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #

Deleting

deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a Source #

Map and Filter

mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b Source #

mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b Source #

Zipping

indexed :: Monad m => Stream m a -> Stream m (Int, a) Source #

indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a) Source #

zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Comparisons

eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool Source #

cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #

Compare two streams lexicographically

Merging

mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Transformation comprehensions

the :: (Eq a, Monad m) => Stream m a -> m (Maybe a) Source #

Exceptions

newFinalizedIORef :: (MonadIO m, MonadBaseControl IO m) => m a -> m (IORef (Maybe (IO ()))) Source #

Create an IORef holding a finalizer that is called automatically when the IORef is garbage collected. The IORef can be written to with a Nothing value to deactivate the finalizer.

runIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m () Source #

Run the finalizer stored in an IORef and deactivate it so that it is run only once.

clearIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m () Source #

Deactivate the finalizer stored in an IORef without running it.

gbracket Source #

Arguments

:: Monad m 
=> m c

before

-> (forall s. m s -> m (Either e s))

try (exception handling)

-> (c -> m d)

after, on normal stop

-> (c -> e -> Stream m b)

on exception

-> (c -> Stream m b)

stream generator

-> Stream m b 

The most general bracketing and exception combinator. All other combinators can be expressed in terms of this combinator. This can also be used for cases which are not covered by the standard combinators.

Internal

before :: Monad m => m b -> Stream m a -> Stream m a Source #

Run a side effect before the stream yields its first element.

after :: Monad m => m b -> Stream m a -> Stream m a Source #

Run a side effect whenever the stream stops normally.

afterIO :: (MonadIO m, MonadBaseControl IO m) => m b -> Stream m a -> Stream m a Source #

bracket :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #

Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action providing the remembered value as an argument whenever the stream ends normally or due to an exception.

bracketIO :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #

onException :: MonadCatch m => m b -> Stream m a -> Stream m a Source #

Run a side effect whenever the stream aborts due to an exception. The exception is not caught, simply rethrown.

finally :: MonadCatch m => m b -> Stream m a -> Stream m a Source #

Run a side effect whenever the stream stops normally or aborts due to an exception.

finallyIO :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a Source #

handle :: (MonadCatch m, Exception e) => (e -> Stream m a) -> Stream m a -> Stream m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.

Concurrent Application

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Internal

newCallbackStream :: (IsStream t, MonadAsync m) => m (a -> m (), t m a) Source #

Generates a callback and a stream pair. The callback returned is used to queue values to the stream. The stream is infinite, there is no way for the callback to indicate that it is done now.

Internal

lastN :: (Storable a, MonadIO m) => Int -> Fold m a (Array a) Source #

Take last n elements from the stream and discard the rest.