Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | released |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Fast, composable stream producers with ability to terminate, supporting stream fusion.
Please refer to Streamly.Internal.Data.Stream for more functions that have not yet been released.
For continuation passing style (CPS) stream type, please refer to the Streamly.Data.StreamK module.
Checkout the https://github.com/composewell/streamly-examples repository for many more real world examples of stream programming.
Synopsis
- data Stream m a
- nil :: Applicative m => Stream m a
- nilM :: Applicative m => m b -> Stream m a
- cons :: Applicative m => a -> Stream m a -> Stream m a
- consM :: Applicative m => m a -> Stream m a -> Stream m a
- unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a
- unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
- fromPure :: Applicative m => a -> Stream m a
- fromEffect :: Applicative m => m a -> Stream m a
- repeat :: Monad m => a -> Stream m a
- repeatM :: Monad m => m a -> Stream m a
- replicate :: Monad m => Int -> a -> Stream m a
- replicateM :: Monad m => Int -> m a -> Stream m a
- class Enum a => Enumerable a where
- enumerateFrom :: Monad m => a -> Stream m a
- enumerateFromTo :: Monad m => a -> a -> Stream m a
- enumerateFromThen :: Monad m => a -> a -> Stream m a
- enumerateFromThenTo :: Monad m => a -> a -> a -> Stream m a
- enumerate :: (Monad m, Bounded a, Enumerable a) => Stream m a
- enumerateTo :: (Monad m, Bounded a, Enumerable a) => a -> Stream m a
- iterate :: Monad m => (a -> a) -> a -> Stream m a
- iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
- fromList :: Applicative m => [a] -> Stream m a
- unfold :: Applicative m => Unfold m a b -> a -> Stream m b
- uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
- fold :: Monad m => Fold m a b -> Stream m a -> m b
- foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
- parse :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b)
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
- toList :: Monad m => Stream m a -> m [a]
- sequence :: Monad m => Stream m (m a) -> Stream m a
- mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
- trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a
- tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
- delay :: MonadIO m => Double -> Stream m a -> Stream m a
- scan :: Monad m => Fold m a b -> Stream m a -> Stream m b
- postscan :: Monad m => Fold m a b -> Stream m a -> Stream m b
- indexed :: Monad m => Stream m a -> Stream m (Int, a)
- insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
- intersperseM :: Monad m => m a -> Stream m a -> Stream m a
- intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a
- intersperse :: Monad m => a -> Stream m a -> Stream m a
- mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
- mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
- filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a
- catLefts :: Monad m => Stream m (Either a b) -> Stream m a
- catRights :: Monad m => Stream m (Either a b) -> Stream m b
- catEithers :: Monad m => Stream m (Either a a) -> Stream m a
- scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
- take :: Applicative m => Int -> Stream m a -> Stream m a
- takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- drop :: Monad m => Int -> Stream m a -> Stream m a
- dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- append :: Monad m => Stream m a -> Stream m a -> Stream m a
- interleave :: Monad m => Stream m a -> Stream m a -> Stream m a
- mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
- mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
- zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b
- intercalate :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c
- intercalateSuffix :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c
- concatEffect :: Monad m => m (Stream m a) -> Stream m a
- concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
- concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
- foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
- parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b)
- chunksOf :: forall m a. (MonadIO m, Unbox a) => Int -> Stream m a -> Stream m (Array a)
- reverse :: Monad m => Stream m a -> Stream m a
- eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
- cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
- isPrefixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool
- isSubsequenceOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool
- stripPrefix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a))
- onException :: MonadCatch m => m b -> Stream m a -> Stream m a
- handle :: (MonadCatch m, Exception e) => (e -> Stream m a) -> Stream m a -> Stream m a
- before :: Monad m => m b -> Stream m a -> Stream m a
- afterIO :: MonadIO m => IO b -> Stream m a -> Stream m a
- finallyIO :: (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a
- bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
- bracketIO3 :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> IO d) -> (b -> IO e) -> (b -> Stream m a) -> Stream m a
- morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a
- liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a
- runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a
- runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a)
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
import Control.Concurrent (threadDelay)
>>>
import Control.Monad (void)
>>>
import Control.Monad.IO.Class (MonadIO (liftIO))
>>>
import Control.Monad.Trans.Class (lift)
>>>
import Control.Monad.Trans.Identity (runIdentityT)
>>>
import Data.Either (fromLeft, fromRight, isLeft, isRight, either)
>>>
import Data.Maybe (fromJust, isJust)
>>>
import Data.Function (fix, (&))
>>>
import Data.Functor.Identity (runIdentity)
>>>
import Data.IORef
>>>
import Data.Semigroup (cycle1)
>>>
import GHC.Exts (Ptr (Ptr))
>>>
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
>>>
hSetBuffering stdout LineBuffering
>>>
effect n = print n >> return n
>>>
import Streamly.Data.Stream (Stream)
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.Unfold as Unfold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.FileSystem.Dir as Dir
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.Data.Fold as Fold
>>>
import qualified Streamly.Internal.Data.Fold.Window as Window
>>>
import qualified Streamly.Internal.Data.Parser as Parser
>>>
import qualified Streamly.Internal.Data.Stream as Stream
>>>
import qualified Streamly.Internal.Data.Unfold as Unfold
>>>
import qualified Streamly.Internal.FileSystem.Dir as Dir
Overview
Streamly is a framework for modular data flow based programming and declarative concurrency. Powerful stream fusion framework in streamly allows high performance combinatorial programming even when using byte level streams. Streamly API is similar to Haskell lists.
Console Echo Example
In the following example, repeatM
generates an infinite stream of String
by repeatedly performing the getLine
IO action. mapM
then applies
putStrLn
on each element in the stream converting it to stream of ()
.
Finally, drain
folds the stream to IO discarding the () values, thus
producing only effects.
>>>
import Data.Function ((&))
>>>
:{
echo = Stream.repeatM getLine -- Stream IO String & Stream.mapM putStrLn -- Stream IO () & Stream.fold Fold.drain -- IO () :}
This is a console echo program. It is an example of a declarative loop
written using streaming combinators. Compare it with an imperative while
loop.
Hopefully, this gives you an idea how we can program declaratively by representing loops using streams. In this module, you can find all Data.List like functions and many more powerful combinators to perform common programming tasks.
Stream Fusion
The fused Stream
type employs stream fusion for C-like performance when
looping over data. It represents a stream source or transformation by
defining a state machine with explicit state, and a step function working on
the state. A typical stream operation consumes elements from the previous
state machine in the pipeline, transforms them and yields new values for the
next stage to consume. The stream operations are modular and represent a
single task, they have no knowledge of previous or next operation on the
elements.
A typical stream pipeline consists of a stream producer, several stream transformation operations and a stream consumer. All these operations taken together form a closed loop processing the stream elements. Elements are transferred between stages using a boxed data constructor. However, all the stages of the pipeline are fused together by GHC, eliminating the intermediate constructors, and thus forming a tight C like loop without any boxed data being used in the loop.
Stream fusion works effectively when:
- the stream pipeline is composed statically (known at compile time)
- all the operations forming the loop are inlined
- the loop is not recursively defined, recursion breaks inlining
If these conditions cannot be met, the CPS style stream type StreamK
may
turn out to be a better choice than the fused stream type Stream
.
Stream vs StreamK
The fused stream model avoids constructor allocations or function call
overheads. However, the stream is represented as a state machine and to
generate elements it has to navigate the decision tree of the state machine.
Moreover, the state machine is cranked for each element in the stream. This
performs extremely well when the number of states are limited. The state
machine starts getting expensive as the number of states increase. For
example, generating a million element stream from a list requires a single
state and is very efficient. However, using fused cons
to generate a
million element stream would be a disaster.
A typical worst case scenario for fused stream model is a large number of
cons
or append
operations. A few static cons
or append
operations
are very fast and much faster than a CPS style stream. However, if we
construct a large stream using cons
it introduces as many states in the
state machine as the number of elements. If we compose the cons
as a
binary tree it will take n * log n
time to navigate the tree, and n * n
if it is a right associative composition.
For quadratic cases of fused stream, after a certain threshold the CPS
stream would perform much better and exhibit linear performance behavior.
Operations like cons
or append
; are typically recursively called to
construct a lazy infinite stream. For such use cases the CPS style StreamK
type is provided. CPS streams do not have a state machine that needs to be
cranked for each element, past state has no effect on the future element
processing. However, it incurs a function call overhead for each operation
for each element, which could be very large overhead compared to fused state
machines even if it has many states and cranks it for each element. But in
some cases scales tip in favor of the CPS stream. In those cases even though
CPS has a large constant overhead, it has a linear performance rather than
quadratic.
As a general guideline, if you have to use cons
or append
or operations
of similar nature, at a large scale, then StreamK
should be used. When you
need to compose the stream dynamically or recursively, then StreamK
should
be used. Typically you would use a dynamically generated StreamK
with
chunks of data which can then be processed by statically fused stream
pipeline operations.
Stream
and StreamK
types can be interconverted. See
Streamly.Data.StreamK module for conversion operations.
Useful Idioms
>>>
fromListM = Stream.sequence . Stream.fromList
>>>
fromIndices f = fmap f $ Stream.enumerateFrom 0
The Stream Type
A stream consists of a step function that generates the next step given a current state, and the current state.
Instances
Construction
Functions ending in the general shape b -> Stream m a
.
See also: Streamly.Internal.Data.Stream.Generate for
Pre-release
functions.
Primitives
Primitives to construct a stream from pure values or monadic actions. All other stream construction and generation combinators described later can be expressed in terms of these primitives. However, the special versions provided in this module can be much more efficient in most cases. Users can create custom combinators using these primitives.
nil :: Applicative m => Stream m a Source #
A stream that terminates without producing any output or side effect.
>>>
Stream.fold Fold.toList Stream.nil
[]
nilM :: Applicative m => m b -> Stream m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>
Stream.fold Fold.toList (Stream.nilM (print "nil"))
"nil" []
Pre-release
cons :: Applicative m => a -> Stream m a -> Stream m a Source #
Fuse a pure value at the head of an existing stream::
>>>
s = 1 `Stream.cons` Stream.fromList [2,3]
>>>
Stream.fold Fold.toList s
[1,2,3]
This function should not be used to dynamically construct a stream. If a stream is constructed by successive use of this function it would take O(n^2) time to consume the stream.
This function should only be used to statically fuse an element with a stream. Do not use this recursively or where it cannot be inlined.
See Streamly.Data.StreamK for a cons
that can be used to
construct a stream recursively.
Definition:
>>>
cons x xs = return x `Stream.consM` xs
consM :: Applicative m => m a -> Stream m a -> Stream m a Source #
Like cons
but fuses an effect instead of a pure value.
Unfolding
unfoldrM
is the most general way of generating a stream efficiently.
All other generation operations can be expressed using it.
unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a Source #
>>>
:{
unfoldr step s = case step s of Nothing -> Stream.nil Just (a, b) -> a `Stream.cons` unfoldr step b :}
Build a stream by unfolding a pure step function step
starting from a
seed s
. The step function returns the next element in the stream and the
next seed value. When it is done it returns Nothing
and the stream ends.
For example,
>>>
:{
let f b = if b > 2 then Nothing else Just (b, b + 1) in Stream.fold Fold.toList $ Stream.unfoldr f 0 :} [0,1,2]
unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a Source #
Build a stream by unfolding a monadic step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns Nothing
and the stream ends. For
example,
>>>
:{
let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in Stream.fold Fold.toList $ Stream.unfoldrM f 0 :} [0,1,2]
From Values
Generate a monadic stream from a seed value or values.
fromPure :: Applicative m => a -> Stream m a Source #
Create a singleton stream from a pure value.
>>>
fromPure a = a `Stream.cons` Stream.nil
>>>
fromPure = pure
>>>
fromPure = Stream.fromEffect . pure
fromEffect :: Applicative m => m a -> Stream m a Source #
Create a singleton stream from a monadic action.
>>>
fromEffect m = m `Stream.consM` Stream.nil
>>>
fromEffect = Stream.sequence . Stream.fromPure
>>>
Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")
hello
repeat :: Monad m => a -> Stream m a Source #
Generate an infinite stream by repeating a pure value.
>>>
repeat x = Stream.repeatM (pure x)
repeatM :: Monad m => m a -> Stream m a Source #
>>>
repeatM = Stream.sequence . Stream.repeat
>>>
repeatM = fix . Stream.consM
>>>
repeatM = cycle1 . Stream.fromEffect
Generate a stream by repeatedly executing a monadic action forever.
>>>
:{
repeatAction = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fold Fold.drain :}
replicate :: Monad m => Int -> a -> Stream m a Source #
>>>
replicate n = Stream.take n . Stream.repeat
>>>
replicate n x = Stream.replicateM n (pure x)
Generate a stream of length n
by repeating a value n
times.
replicateM :: Monad m => Int -> m a -> Stream m a Source #
>>>
replicateM n = Stream.sequence . Stream.replicate n
Generate a stream by performing a monadic action n
times.
Enumeration
We can use the Enum
type class to enumerate a type producing a list
and then convert it to a stream:
fromList
$enumFromThen
from then
However, this is not particularly efficient.
The Enumerable
type class provides corresponding functions that
generate a stream instead of a list, efficiently.
class Enum a => Enumerable a where Source #
Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the Enum
type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.
enumerateFrom :: Monad m => a -> Stream m a Source #
enumerateFrom from
generates a stream starting with the element
from
, enumerating up to maxBound
when the type is Bounded
or
generating an infinite stream when the type is not Bounded
.
>>>
Stream.fold Fold.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]
For Fractional
types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.
>>>
Stream.fold Fold.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]
enumerateFromTo :: Monad m => a -> a -> Stream m a Source #
Generate a finite stream starting with the element from
, enumerating
the type up to the value to
. If to
is smaller than from
then an
empty stream is returned.
>>>
Stream.fold Fold.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]
For Fractional
types, the last element is equal to the specified to
value after rounding to the nearest integral value.
>>>
Stream.fold Fold.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]
>>>
Stream.fold Fold.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]
enumerateFromThen :: Monad m => a -> a -> Stream m a Source #
enumerateFromThen from then
generates a stream whose first element
is from
, the second element is then
and the successive elements are
in increments of then - from
. Enumeration can occur downwards or
upwards depending on whether then
comes before or after from
. For
Bounded
types the stream ends when maxBound
is reached, for
unbounded types it keeps enumerating infinitely.
>>>
Stream.fold Fold.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]
>>>
Stream.fold Fold.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]
enumerateFromThenTo :: Monad m => a -> a -> a -> Stream m a Source #
enumerateFromThenTo from then to
generates a finite stream whose
first element is from
, the second element is then
and the successive
elements are in increments of then - from
up to to
. Enumeration can
occur downwards or upwards depending on whether then
comes before or
after from
.
>>>
Stream.fold Fold.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]
>>>
Stream.fold Fold.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]
Instances
enumerateTo :: (Monad m, Bounded a, Enumerable a) => a -> Stream m a Source #
Iteration
iterate :: Monad m => (a -> a) -> a -> Stream m a Source #
>>>
iterate f x = x `Stream.cons` iterate f x
Generate an infinite stream with x
as the first element and each
successive element derived by applying the function f
on the previous
element.
>>>
Stream.fold Fold.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]
iterateM :: Monad m => (a -> m a) -> m a -> Stream m a Source #
>>>
iterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)
Generate an infinite stream with the first element generated by the action
m
and each successive element derived by applying the monadic function
f
on the previous element.
>>>
:{
Stream.iterateM (\x -> print x >> return (x + 1)) (return 0) & Stream.take 3 & Stream.fold Fold.toList :} 0 1 [0,1,2]
From Containers
Convert an input structure, container or source into a stream. All of these can be expressed in terms of primitives.
fromList :: Applicative m => [a] -> Stream m a Source #
Construct a stream from a list of pure values.
From Unfolds
Most of the above stream generation operations can also be expressed using the corresponding unfolds in the Streamly.Data.Unfold module.
unfold :: Applicative m => Unfold m a b -> a -> Stream m b Source #
Convert an Unfold
into a stream by supplying it an input seed.
>>>
s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")
>>>
Stream.fold Fold.drain s
hello hello hello
Elimination
Functions ending in the general shape Stream m a -> m b
or Stream m
a -> m (b, Stream m a)
See also: Streamly.Internal.Data.Stream.Eliminate for Pre-release
functions.
Primitives
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
Nothing
. If the stream is non-empty, returns Just (a, ma)
, where a
is
the head of the stream and ma
its tail.
Properties:
>>>
Nothing <- Stream.uncons Stream.nil
>>>
Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)
This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
All the folds in this module can be expressed in terms of uncons
, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.
foldBreak
is a more general way of consuming a stream piecemeal.
>>>
:{
uncons xs = do r <- Stream.foldBreak Fold.one xs return $ case r of (Nothing, _) -> Nothing (Just h, t) -> Just (h, t) :}
Strict Left Folds
fold :: Monad m => Fold m a b -> Stream m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
Definitions:
>>>
fold f = fmap fst . Stream.foldBreak f
>>>
fold f = Stream.parse (Parser.fromFold f)
Example:
>>>
Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050
Parsing
parse :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b) Source #
Parse a stream using the supplied Parser
.
Parsers (See Streamly.Internal.Data.Parser) are more powerful folds that add backtracking and error functionality to terminating folds. Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:
>>>
Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")
Note: parse p
is not the same as head . parseMany p
on an empty stream.
Lazy Right Folds
Consuming a stream to build a right associated expression, suitable for lazy evaluation. Evaluation of the input happens when the output of the fold is evaluated, the fold output is a lazy thunk.
This is suitable for stream transformation operations, for example, operations like mapping a function over the stream.
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream
constructs
an output structure using the step function build
. build
is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls build
again thus
lazily consuming the input stream
until either the output expression built
by build
is free of the "tail" or the input is exhausted in which case
final
is used as the terminating case for the output structure. For more
details see the description in the previous section.
Example, determine if any element is odd
in a stream:
>>>
s = Stream.fromList (2:4:5:undefined)
>>>
step x xs = if odd x then return True else xs
>>>
Stream.foldrM step (return False) s
True
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad m
is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.
>>>
foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)
Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.
Specific Folds
Usually you can use the folds in Streamly.Data.Fold. However, some folds that may be commonly used or may have an edge in performance in some cases are provided here. , drain
toList :: Monad m => Stream m a -> m [a] Source #
Definitions:
>>>
toList = Stream.foldr (:) []
>>>
toList = Stream.fold Fold.toList
Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. Identity
). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.
Note that this could a bit more efficient compared to Stream.fold
Fold.toList
, and it can fuse with pure list consumers.
Mapping
Stateless one-to-one transformations. Use fmap
for mapping a pure
function on a stream.
sequence :: Monad m => Stream m (m a) -> Stream m a Source #
>>>
sequence = Stream.mapM id
Replace the elements of a stream of monadic actions with the outputs of those actions.
>>>
s = Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
>>>
Stream.fold Fold.drain $ Stream.sequence s
abc
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #
>>>
mapM f = Stream.sequence . fmap f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
>>>
s = Stream.fromList ["a", "b", "c"]
>>>
Stream.fold Fold.drain $ Stream.mapM putStr s
abc
trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a Source #
Apply a monadic function to each element flowing through the stream and discard the results.
>>>
s = Stream.enumerateFromTo 1 2
>>>
Stream.fold Fold.drain $ Stream.trace print s
1 2
Compare with tap
.
tap :: Monad m => Fold m a b -> Stream m a -> Stream m a Source #
Tap the data flowing through a stream into a Fold
. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.
Fold m a b | -----stream m a ---------------stream m a-----
>>>
s = Stream.enumerateFromTo 1 2
>>>
Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s
1 2
Compare with trace
.
delay :: MonadIO m => Double -> Stream m a -> Stream m a Source #
Introduce a delay of specified seconds between elements of the stream.
Definition:
>>>
sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>>
delay = Stream.intersperseM_ . sleep
Example:
>>>
input = Stream.enumerateFromTo 1 3
>>>
Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input
1 2 3
Scanning
Stateful one-to-one transformations.
See also: Streamly.Internal.Data.Stream.Transform for
Pre-release
functions.
Scanning By Fold
scan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Strict left scan. Scan a stream using the given monadic fold.
>>>
s = Stream.fromList [1..10]
>>>
Stream.fold Fold.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum s
[0,1,3,6]
See also: usingStateT
postscan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Postscan a stream using the given monadic fold.
The following example extracts the input stream up to a point where the running average of elements is no more than 10:
>>>
import Data.Maybe (fromJust)
>>>
let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>>
s = Stream.enumerateFromTo 1.0 100.0
>>>
:{
Stream.fold Fold.toList $ fmap (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.latest avg) s :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
Specific scans
indexed :: Monad m => Stream m a -> Stream m (Int, a) Source #
>>>
f = Fold.foldl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
>>>
indexed = Stream.postscan f
>>>
indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
>>>
indexedR n = fmap (\(i, a) -> (n - i, a)) . indexed
Pair each element in a stream with its index, starting from index 0.
>>>
Stream.fold Fold.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]
Insertion
Add elements to the stream.
insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #
insertBy cmp elem stream
inserts elem
before the first element in
stream
that is less than elem
when compared using cmp
.
>>>
insertBy cmp x = Stream.mergeBy cmp (Stream.fromPure x)
>>>
input = Stream.fromList [1,3,5]
>>>
Stream.fold Fold.toList $ Stream.insertBy compare 2 input
[1,2,3,5]
intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #
Insert an effect and its output before consuming an element of a stream except the first one.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') input
h.,e.,l.,l.,o"h,e,l,l,o"
Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".
>>>
Stream.fold Fold.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar input
he.l.l.o."h,e,l,l,o"
intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a Source #
Insert a side effect before consuming an element of a stream except the first one.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') input
h.e.l.l.o
Pre-release
intersperse :: Monad m => a -> Stream m a -> Stream m a Source #
Insert a pure value between successive elements of a stream.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.intersperse ',' input
"h,e,l,l,o"
Filtering
Remove elements from the stream.
Stateless Filters
mapMaybeM
is the most general stateless filtering operation. All
other filtering operations can be expressed using it.
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b Source #
Like mapMaybe
but maps a monadic function.
Equivalent to:
>>>
mapMaybeM f = Stream.catMaybes . Stream.mapM f
>>>
mapM f = Stream.mapMaybeM (\x -> Just <$> f x)
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
Include only those elements that pass a predicate.
>>>
filter p = Stream.filterM (return . p)
>>>
filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)
>>>
filter p = Stream.scanMaybe (Fold.filtering p)
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as filter
but with a monadic predicate.
>>>
f p x = p x >>= \r -> return $ if r then Just x else Nothing
>>>
filterM p = Stream.mapMaybeM (f p)
catEithers :: Monad m => Stream m (Either a a) -> Stream m a Source #
Remove the either wrapper and flatten both lefts and as well as rights in the output stream.
>>>
catEithers = fmap (either id id)
Pre-release
Stateful Filters
scanMaybe
is the most general stateful filtering operation. The
filtering folds (folds returning a Maybe
type) in
Streamly.Internal.Data.Fold can be used along with scanMaybe
to
perform stateful filtering operations in general.
scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b Source #
Use a filtering fold on a stream.
>>>
scanMaybe f = Stream.catMaybes . Stream.postscan f
take :: Applicative m => Int -> Stream m a -> Stream m a Source #
Take first n
elements from the stream and discard the rest.
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
End the stream as soon as the predicate fails on an element.
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as takeWhile
but with a monadic predicate.
drop :: Monad m => Int -> Stream m a -> Stream m a Source #
Discard first n
elements from the stream and take the rest.
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as dropWhile
but with a monadic predicate.
Combining Two Streams
Appending
append :: Monad m => Stream m a -> Stream m a -> Stream m a Source #
Fuses two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.
>>>
s1 = Stream.fromList [1,2]
>>>
s2 = Stream.fromList [3,4]
>>>
Stream.fold Fold.toList $ s1 `Stream.append` s2
[1,2,3,4]
This function should not be used to dynamically construct a stream. If a stream is constructed by successive use of this function it would take quadratic time complexity to consume the stream.
This function should only be used to statically fuse a stream with another stream. Do not use this recursively or where it cannot be inlined.
See Streamly.Data.StreamK for an append
that can be used to
construct a stream recursively.
Interleaving
When interleaving more than two streams you may want to interleave them pairwise creating a balanced binary merge tree.
interleave :: Monad m => Stream m a -> Stream m a -> Stream m a Source #
Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.
When joining many streams in a left associative manner earlier streams will
get exponential priority than the ones joining later. Because of exponential
weighting it can be used with concatMapWith
even on a large number of
streams.
Merging
When merging more than two streams you may want to merging them pairwise creating a balanced binary merge tree.
Merging of n
streams can be performed by combining the streams pair
wise using mergeMapWith
to give O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.
If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.
>>>
s1 = Stream.fromList [1,3,5]
>>>
s2 = Stream.fromList [2,4,6,8]
>>>
Stream.fold Fold.toList $ Stream.mergeBy compare s1 s2
[1,2,3,4,5,6,8]
mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeBy
but with a monadic comparison function.
Merge two streams randomly:
> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]
Merge two streams in a proportion of 2:1:
>>>
:{
do let s1 = Stream.fromList [1,1,1,1,1,1] s2 = Stream.fromList [2,2,2] let proportionately m n = do ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT] return $ \_ _ -> do r <- readIORef ref writeIORef ref $ Prelude.tail r return $ Prelude.head r f <- proportionately 2 1 xs <- Stream.fold Fold.toList $ Stream.mergeByM f s1 s2 print xs :} [1,1,2,1,1,2,1,1,2]
Zipping
When zipping more than two streams you may want to zip them pairwise creating a balanced binary tree.
Zipping of n
streams can be performed by combining the streams pair
wise using mergeMapWith
with O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
Stream a
is evaluated first, followed by stream b
, the resulting
elements a
and b
are then zipped using the supplied zip function and the
result c
is yielded to the consumer.
If stream a
or stream b
ends, the zipped stream ends. If stream b
ends
first, the element a
from previous evaluation of stream a
is discarded.
>>>
s1 = Stream.fromList [1,2,3]
>>>
s2 = Stream.fromList [4,5,6]
>>>
Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2
[5,7,9]
zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #
Like zipWith
but using a monadic zipping function.
Cross Product
crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
Definition:
>>>
crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2
Note that the second stream is evaluated multiple times.
Unfold Each
unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
unfoldMany unfold stream
uses unfold
to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.
Like concatMap
but uses an Unfold
for stream generation. Unlike
concatMap
this can fuse the Unfold
code with the inner loop and
therefore provide many times better performance.
intercalate :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #
intersperse
followed by unfold and concat.
>>>
intercalate u a = Stream.unfoldMany u . Stream.intersperse a
>>>
intersperse = Stream.intercalate Unfold.identity
>>>
unwords = Stream.intercalate Unfold.fromList " "
>>>
input = Stream.fromList ["abc", "def", "ghi"]
>>>
Stream.fold Fold.toList $ Stream.intercalate Unfold.fromList " " input
"abc def ghi"
intercalateSuffix :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #
intersperseMSuffix
followed by unfold and concat.
>>>
intercalateSuffix u a = Stream.unfoldMany u . Stream.intersperseMSuffix a
>>>
intersperseMSuffix = Stream.intercalateSuffix Unfold.identity
>>>
unlines = Stream.intercalateSuffix Unfold.fromList "\n"
>>>
input = Stream.fromList ["abc", "def", "ghi"]
>>>
Stream.fold Fold.toList $ Stream.intercalateSuffix Unfold.fromList "\n" input
"abc\ndef\nghi\n"
Stream of streams
Stream operations like map and filter represent loop processing in
imperative programming terms. Similarly, the imperative concept of
nested loops are represented by streams of streams. The concatMap
operation represents nested looping.
A concatMap
operation loops over the input stream and then for each
element of the input stream generates another stream and then loops over
that inner stream as well producing effects and generating a single
output stream.
One dimension loops are just a special case of nested loops. For
example, concatMap
can degenerate to a simple map operation:
map f m = S.concatMap (\x -> S.fromPure (f x)) m
Similarly, concatMap
can perform filtering by mapping an element to a
nil
stream:
filter p m = S.concatMap (\x -> if p x then S.fromPure x else S.nil) m
concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
>>>
concatMap f = Stream.concatMapM (return . f)
>>>
concatMap f = Stream.concat . fmap f
>>>
concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)
See unfoldMany
for a fusible alternative.
concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap
, it can produce an
effect at the beginning of each iteration of the inner loop.
See unfoldMany
for a fusible alternative.
Repeated Fold
foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Apply a Fold
repeatedly on a stream and emit the results in the output
stream.
Definition:
>>>
foldMany f = Stream.parseMany (Parser.fromFold f)
Example, empty stream:
>>>
f = Fold.take 2 Fold.sum
>>>
fmany = Stream.fold Fold.toList . Stream.foldMany f
>>>
fmany $ Stream.fromList []
[]
Example, last fold empty:
>>>
fmany $ Stream.fromList [1..4]
[3,7]
Example, last fold non-empty:
>>>
fmany $ Stream.fromList [1..5]
[3,7,5]
Note that using a closed fold e.g. Fold.take 0
, would result in an
infinite stream on a non-empty input stream.
parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #
Apply a Parser
repeatedly on a stream and emit the parsed values in the
output stream.
Example:
>>>
s = Stream.fromList [1..10]
>>>
parser = Parser.takeBetween 0 2 Fold.sum
>>>
Stream.fold Fold.toList $ Stream.parseMany parser s
[Right 3,Right 7,Right 11,Right 15,Right 19]
This is the streaming equivalent of the many
parse
combinator.
Known Issues: When the parser fails there is no way to get the remaining stream.
chunksOf :: forall m a. (MonadIO m, Unbox a) => Int -> Stream m a -> Stream m (Array a) Source #
chunksOf n stream
groups the elements in the input stream into arrays of
n
elements each.
Same as the following but may be more efficient:
>>>
chunksOf n = Stream.foldMany (Array.writeN n)
Pre-release
Buffered Operations
Operations that require buffering of the stream. Reverse is essentially a left fold followed by an unfold.
reverse :: Monad m => Stream m a -> Stream m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
Definition:
>>>
reverse m = Stream.concatEffect $ Stream.fold Fold.toListRev m >>= return . Stream.fromList
Multi-Stream folds
Operations that consume multiple streams at the same time.
eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool Source #
Compare two streams for equality
cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #
Compare two streams lexicographically.
isPrefixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #
Returns True
if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.
>>>
Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: Stream IO Char)
True
isSubsequenceOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #
Returns True
if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.
>>>
Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: Stream IO Char)
True
stripPrefix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #
stripPrefix prefix input
strips the prefix
stream from the input
stream if it is a prefix of input. Returns Nothing
if the input does not
start with the given prefix, stripped input otherwise. Returns Just nil
when the prefix is the same as the input stream.
Space: O(1)
Exceptions
Most of these combinators inhibit stream fusion, therefore, when possible, they should be called in an outer loop to mitigate the cost. For example, instead of calling them on a stream of chars call them on a stream of arrays before flattening it to a stream of chars.
See also: Streamly.Internal.Data.Stream.Exception for
Pre-release
functions.
onException :: MonadCatch m => m b -> Stream m a -> Stream m a Source #
Run the action m b
if the stream evaluation is aborted due to an
exception. The exception is not caught, simply rethrown.
Inhibits stream fusion
handle :: (MonadCatch m, Exception e) => (e -> Stream m a) -> Stream m a -> Stream m a Source #
When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.
Inhibits stream fusion
Resource Management
bracket
is the most general resource management operation, all other
operations can be expressed using it. These functions have IO suffix
because the allocation and cleanup functions are IO actions. For
generalized allocation and cleanup functions see the functions without
the IO suffix in the "streamly" package.
before :: Monad m => m b -> Stream m a -> Stream m a Source #
Run the action m b
before the stream yields its first element.
Same as the following but more efficient due to fusion:
>>>
before action xs = Stream.nilM action <> xs
>>>
before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)
afterIO :: MonadIO m => IO b -> Stream m a -> Stream m a Source #
Run the action IO b
whenever the stream is evaluated to completion, or
if it is garbage collected after a partial lazy evaluation.
The semantics of the action IO b
are similar to the semantics of cleanup
action in bracketIO
.
See also afterUnsafe
finallyIO :: (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a Source #
Run the action IO b
whenever the stream stream stops normally, aborts
due to an exception or if it is garbage collected after a partial lazy
evaluation.
The semantics of running the action IO b
are similar to the cleanup action
semantics described in bracketIO
.
>>>
finallyIO release = Stream.bracketIO (return ()) (const release)
See also finallyUnsafe
Inhibits stream fusion
bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a Source #
Run the alloc action IO b
with async exceptions disabled but keeping
blocking operations interruptible (see mask
). Use the
output b
as input to b -> Stream m a
to generate an output stream.
b
is usually a resource under the IO monad, e.g. a file handle, that
requires a cleanup after use. The cleanup action b -> IO c
, runs whenever
the stream ends normally, due to a sync or async exception or if it gets
garbage collected after a partial lazy evaluation.
bracketIO
only guarantees that the cleanup action runs, and it runs with
async exceptions enabled. The action must ensure that it can successfully
cleanup the resource in the face of sync or async exceptions.
When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.
See also: bracketUnsafe
Inhibits stream fusion
bracketIO3 :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> IO d) -> (b -> IO e) -> (b -> Stream m a) -> Stream m a Source #
Like bracketIO
but can use 3 separate cleanup actions depending on the
mode of termination:
- When the stream stops normally
- When the stream is garbage collected
- When the stream encounters an exception
bracketIO3 before onStop onGC onException action
runs action
using the
result of before
. If the stream stops, onStop
action is executed, if the
stream is abandoned onGC
is executed, if the stream encounters an
exception onException
is executed.
Inhibits stream fusion
Pre-release
Transforming Inner Monad
morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a Source #
Transform the inner monad of a stream using a natural transformation.
Example, generalize the inner monad from Identity to any other:
>>>
generalizeInner = Stream.morphInner (return . runIdentity)
Also known as hoist.
liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a Source #
Lift the inner monad m
of Stream m a
to t m
where t
is a monad
transformer.