Copyright | (c) 2018 Composewell Technologies (c) Roman Leshchinskiy 2008-2010 |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Synopsis
- data Step s a
- data Stream m a where
- data CrossStream m a
- unCross :: CrossStream m a -> Stream m a
- mkCross :: Stream m a -> CrossStream m a
- fromStreamK :: Applicative m => StreamK m a -> Stream m a
- toStreamK :: Monad m => Stream m a -> StreamK m a
- unfold :: Applicative m => Unfold m a b -> a -> Stream m b
- nilM :: Applicative m => m b -> Stream m a
- consM :: Applicative m => m a -> Stream m a -> Stream m a
- fromPure :: Applicative m => a -> Stream m a
- fromEffect :: Applicative m => m a -> Stream m a
- fromList :: Applicative m => [a] -> Stream m a
- uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
- fold :: Monad m => Fold m a b -> Stream m a -> m b
- foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
- foldAddLazy :: Monad m => Fold m a b -> Stream m a -> Fold m a b
- foldAdd :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b)
- foldEither :: Monad m => Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a))
- foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
- foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b
- foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
- foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
- foldrMx :: Monad m => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
- foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
- drain :: Monad m => Stream m a -> m ()
- toList :: Monad m => Stream m a -> m [a]
- map :: Monad m => (a -> b) -> Stream m a -> Stream m b
- mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
- take :: Applicative m => Int -> Stream m a -> Stream m a
- takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- takeEndBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeEndByM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
- crossApplyFst :: Functor f => Stream f a -> Stream f b -> Stream f a
- crossApplySnd :: Functor f => Stream f a -> Stream f b -> Stream f b
- crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b)
- data ConcatMapUState o i
- = ConcatMapUOuter o
- | ConcatMapUInner o i
- unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b
- concatEffect :: Monad m => m (Stream m a) -> Stream m a
- concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
- concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
- concat :: Monad m => Stream m (Stream m a) -> Stream m a
- unfoldIterateDfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a
- unfoldIterateBfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a
- unfoldIterateBfsRev :: Monad m => Unfold m a a -> Stream m a -> Stream m a
- concatIterateScan :: Monad m => (b -> a -> m b) -> (b -> m (Maybe (b, Stream m a))) -> b -> Stream m a
- concatIterateDfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
- concatIterateBfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
- concatIterateBfsRev :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
- data FoldMany s fs b a
- = FoldManyStart s
- | FoldManyFirst fs s
- | FoldManyLoop s fs
- | FoldManyYield b (FoldMany s fs b a)
- | FoldManyDone
- data FoldManyPost s fs b a
- = FoldManyPostStart s
- | FoldManyPostLoop s fs
- | FoldManyPostYield b (FoldManyPost s fs b a)
- | FoldManyPostDone
- foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
- foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b
- groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
- refoldMany :: Monad m => Refold m x a b -> m x -> Stream m a -> Stream m b
- reduceIterateBfs :: Monad m => (a -> a -> m a) -> Stream m a -> m (Maybe a)
- foldIterateBfs :: Fold m a (Either a a) -> Stream m a -> m (Maybe a)
- eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
- cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
The stream type
A stream consists of a step function that generates the next step given a current state, and the current state.
Instances
CrossStream type wrapper
data CrossStream m a Source #
A newtype wrapper for the Stream
type with a cross product style monad
instance.
A Monad
bind behaves like a for
loop:
>>>
:{
Stream.fold Fold.toList $ Stream.unCross $ do x <- Stream.mkCross $ Stream.fromList [1,2] -- Perform the following actions for each x in the stream return x :} [1,2]
Nested monad binds behave like nested for
loops:
>>>
:{
Stream.fold Fold.toList $ Stream.unCross $ do x <- Stream.mkCross $ Stream.fromList [1,2] y <- Stream.mkCross $ Stream.fromList [3,4] -- Perform the following actions for each x, for each y return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Instances
unCross :: CrossStream m a -> Stream m a Source #
mkCross :: Stream m a -> CrossStream m a Source #
Conversion to StreamK
fromStreamK :: Applicative m => StreamK m a -> Stream m a Source #
Convert a CPS encoded StreamK to direct style step encoded StreamD
toStreamK :: Monad m => Stream m a -> StreamK m a Source #
Convert a direct style step encoded StreamD to a CPS encoded StreamK
From Unfold
unfold :: Applicative m => Unfold m a b -> a -> Stream m b Source #
Convert an Unfold
into a stream by supplying it an input seed.
>>>
s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")
>>>
Stream.fold Fold.drain s
hello hello hello
Construction
Primitives
nilM :: Applicative m => m b -> Stream m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>
Stream.fold Fold.toList (Stream.nilM (print "nil"))
"nil" []
Pre-release
consM :: Applicative m => m a -> Stream m a -> Stream m a Source #
Like cons
but fuses an effect instead of a pure value.
From Values
fromPure :: Applicative m => a -> Stream m a Source #
Create a singleton stream from a pure value.
>>>
fromPure a = a `Stream.cons` Stream.nil
>>>
fromPure = pure
>>>
fromPure = Stream.fromEffect . pure
fromEffect :: Applicative m => m a -> Stream m a Source #
Create a singleton stream from a monadic action.
>>>
fromEffect m = m `Stream.consM` Stream.nil
>>>
fromEffect = Stream.sequence . Stream.fromPure
>>>
Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")
hello
From Containers
fromList :: Applicative m => [a] -> Stream m a Source #
Construct a stream from a list of pure values.
Elimination
Primitives
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
Nothing
. If the stream is non-empty, returns Just (a, ma)
, where a
is
the head of the stream and ma
its tail.
Properties:
>>>
Nothing <- Stream.uncons Stream.nil
>>>
Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)
This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
All the folds in this module can be expressed in terms of uncons
, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.
foldBreak
is a more general way of consuming a stream piecemeal.
>>>
:{
uncons xs = do r <- Stream.foldBreak Fold.one xs return $ case r of (Nothing, _) -> Nothing (Just h, t) -> Just (h, t) :}
Strict Left Folds
fold :: Monad m => Fold m a b -> Stream m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
Definitions:
>>>
fold f = fmap fst . Stream.foldBreak f
>>>
fold f = Stream.parse (Parser.fromFold f)
Example:
>>>
Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050
foldAddLazy :: Monad m => Fold m a b -> Stream m a -> Fold m a b Source #
Append a stream to a fold lazily to build an accumulator incrementally.
Example, to continue folding a list of streams on the same sum fold:
>>>
streams = [Stream.fromList [1..5], Stream.fromList [6..10]]
>>>
f = Prelude.foldl Stream.foldAddLazy Fold.sum streams
>>>
Stream.fold f Stream.nil
55
foldAdd :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b) Source #
>>>
foldAdd = flip Fold.addStream
foldEither :: Monad m => Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a)) Source #
Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.
Internal
Lazy Right Folds
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream
constructs
an output structure using the step function build
. build
is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls build
again thus
lazily consuming the input stream
until either the output expression built
by build
is free of the "tail" or the input is exhausted in which case
final
is used as the terminating case for the output structure. For more
details see the description in the previous section.
Example, determine if any element is odd
in a stream:
>>>
s = Stream.fromList (2:4:5:undefined)
>>>
step x xs = if odd x then return True else xs
>>>
Stream.foldrM step (return False) s
True
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad m
is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.
>>>
foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)
Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.
foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #
Specific Folds
drain :: Monad m => Stream m a -> m () Source #
Definitions:
>>>
drain = Stream.fold Fold.drain
>>>
drain = Stream.foldrM (\_ xs -> xs) (return ())
Run a stream, discarding the results.
toList :: Monad m => Stream m a -> m [a] Source #
Definitions:
>>>
toList = Stream.foldr (:) []
>>>
toList = Stream.fold Fold.toList
Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. Identity
). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.
Note that this could a bit more efficient compared to Stream.fold
Fold.toList
, and it can fuse with pure list consumers.
Mapping
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #
>>>
mapM f = Stream.sequence . fmap f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
>>>
s = Stream.fromList ["a", "b", "c"]
>>>
Stream.fold Fold.drain $ Stream.mapM putStr s
abc
Stateful Filters
take :: Applicative m => Int -> Stream m a -> Stream m a Source #
Take first n
elements from the stream and discard the rest.
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
End the stream as soon as the predicate fails on an element.
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as takeWhile
but with a monadic predicate.
Combining Two Streams
Zipping
zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #
Like zipWith
but using a monadic zipping function.
zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
Stream a
is evaluated first, followed by stream b
, the resulting
elements a
and b
are then zipped using the supplied zip function and the
result c
is yielded to the consumer.
If stream a
or stream b
ends, the zipped stream ends. If stream b
ends
first, the element a
from previous evaluation of stream a
is discarded.
>>>
s1 = Stream.fromList [1,2,3]
>>>
s2 = Stream.fromList [4,5,6]
>>>
Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2
[5,7,9]
Cross Product
crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b Source #
Apply a stream of functions to a stream of values and flatten the results.
Note that the second stream is evaluated multiple times.
>>>
crossApply = Stream.crossWith id
crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
Definition:
>>>
crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2
Note that the second stream is evaluated multiple times.
cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b) Source #
Given a Stream m a
and Stream m b
generate a stream with all possible
combinations of the tuple (a, b)
.
Definition:
>>>
cross = Stream.crossWith (,)
The second stream is evaluated multiple times. If that is not desired it can
be cached in an Array
and then generated from the array before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.
See cross
for a much faster fused
alternative.
Time: O(m x n)
Pre-release
Unfold Many
data ConcatMapUState o i Source #
unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
unfoldMany unfold stream
uses unfold
to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.
Like concatMap
but uses an Unfold
for stream generation. Unlike
concatMap
this can fuse the Unfold
code with the inner loop and
therefore provide many times better performance.
Concat
concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
>>>
concatMap f = Stream.concatMapM (return . f)
>>>
concatMap f = Stream.concat . fmap f
>>>
concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)
See unfoldMany
for a fusible alternative.
concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap
, it can produce an
effect at the beginning of each iteration of the inner loop.
See unfoldMany
for a fusible alternative.
concat :: Monad m => Stream m (Stream m a) -> Stream m a Source #
Flatten a stream of streams to a single stream.
>>>
concat = Stream.concatMap id
Pre-release
Unfold Iterate
unfoldIterateDfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #
Same as concatIterateDfs
but more efficient due to stream fusion.
Example, list a directory tree using DFS:
>>>
f = Unfold.either Dir.eitherReaderPaths Unfold.nil
>>>
input = Stream.fromPure (Left ".")
>>>
ls = Stream.unfoldIterateDfs f input
Pre-release
unfoldIterateBfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #
Like unfoldIterateDfs
but uses breadth first style traversal.
Pre-release
unfoldIterateBfsRev :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #
Like unfoldIterateBfs
but processes the children in reverse order,
therefore, may be slightly faster.
Pre-release
Concat Iterate
concatIterateScan :: Monad m => (b -> a -> m b) -> (b -> m (Maybe (b, Stream m a))) -> b -> Stream m a Source #
Generate a stream from an initial state, scan and concat the stream, generate a stream again from the final state of the previous scan and repeat the process.
concatIterateDfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #
Traverse the stream in depth first style (DFS). Map each element in the input stream to a stream and flatten, recursively map the resulting elements as well to a stream and flatten until no more streams are generated.
Example, list a directory tree using DFS:
>>>
f = either (Just . Dir.readEitherPaths) (const Nothing)
>>>
input = Stream.fromPure (Left ".")
>>>
ls = Stream.concatIterateDfs f input
This is equivalent to using concatIterateWith StreamK.append
.
Pre-release
concatIterateBfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #
Similar to concatIterateDfs
except that it traverses the stream in
breadth first style (BFS). First, all the elements in the input stream are
emitted, and then their traversals are emitted.
Example, list a directory tree using BFS:
>>>
f = either (Just . Dir.readEitherPaths) (const Nothing)
>>>
input = Stream.fromPure (Left ".")
>>>
ls = Stream.concatIterateBfs f input
Pre-release
concatIterateBfsRev :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #
Same as concatIterateBfs
except that the traversal of the last
element on a level is emitted first and then going backwards up to the first
element (reversed ordering). This may be slightly faster than
concatIterateBfs
.
Fold Many
data FoldMany s fs b a Source #
FoldManyStart s | |
FoldManyFirst fs s | |
FoldManyLoop s fs | |
FoldManyYield b (FoldMany s fs b a) | |
FoldManyDone |
data FoldManyPost s fs b a Source #
FoldManyPostStart s | |
FoldManyPostLoop s fs | |
FoldManyPostYield b (FoldManyPost s fs b a) | |
FoldManyPostDone |
foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Apply a Fold
repeatedly on a stream and emit the results in the output
stream.
Definition:
>>>
foldMany f = Stream.parseMany (Parser.fromFold f)
Example, empty stream:
>>>
f = Fold.take 2 Fold.sum
>>>
fmany = Stream.fold Fold.toList . Stream.foldMany f
>>>
fmany $ Stream.fromList []
[]
Example, last fold empty:
>>>
fmany $ Stream.fromList [1..4]
[3,7]
Example, last fold non-empty:
>>>
fmany $ Stream.fromList [1..5]
[3,7,5]
Note that using a closed fold e.g. Fold.take 0
, would result in an
infinite stream on a non-empty input stream.
foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Like foldMany
but evaluates the fold even if the fold did not receive
any input, therefore, always results in a non-empty output even on an empty
stream (default result of the fold).
Example, empty stream:
>>>
f = Fold.take 2 Fold.sum
>>>
fmany = Stream.fold Fold.toList . Stream.foldManyPost f
>>>
fmany $ Stream.fromList []
[0]
Example, last fold empty:
>>>
fmany $ Stream.fromList [1..4]
[3,7,0]
Example, last fold non-empty:
>>>
fmany $ Stream.fromList [1..5]
[3,7,5]
Note that using a closed fold e.g. Fold.take 0
, would result in an
infinite stream without consuming the input.
Pre-release
Fold Iterate
reduceIterateBfs :: Monad m => (a -> a -> m a) -> Stream m a -> m (Maybe a) Source #
Binary BFS style reduce, folds a level entirely using the supplied fold function, collecting the outputs as next level of the tree, then repeats the same process on the next level. The last elements of a previously folded level are folded first.
foldIterateBfs :: Fold m a (Either a a) -> Stream m a -> m (Maybe a) Source #
N-Ary BFS style iterative fold, if the input stream finished before the fold then it returns Left otherwise Right. If the fold returns Left we terminate.
Unimplemented