streamly-core-0.2.2: Streaming, parsers, arrays, serialization and more
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Data.Stream

Description

Streams represented as state machines, that fuse together when composed statically, eliminating function calls or intermediate constructor allocations - generating tight, efficient loops. Suitable for high performance looping operations.

If you need to call these operations recursively in a loop (i.e. composed dynamically) then it is recommended to use the continuation passing style (CPS) stream operations from the Streamly.Data.StreamK module. Stream and StreamK types are interconvertible. See more details in the documentation below regarding Stream vs StreamK.

Please refer to Streamly.Internal.Data.Stream for more functions that have not yet been released.

Checkout the https://github.com/composewell/streamly-examples repository for many more real world examples of stream programming.

Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :m
>>> import Control.Concurrent (threadDelay)
>>> import Control.Monad (void)
>>> import Control.Monad.IO.Class (MonadIO (liftIO))
>>> import Control.Monad.Trans.Class (lift)
>>> import Control.Monad.Trans.Identity (runIdentityT)
>>> import Data.Either (fromLeft, fromRight, isLeft, isRight, either)
>>> import Data.Maybe (fromJust, isJust)
>>> import Data.Function (fix, (&))
>>> import Data.Functor.Identity (runIdentity)
>>> import Data.IORef
>>> import Data.Semigroup (cycle1)
>>> import GHC.Exts (Ptr (Ptr))
>>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
>>> hSetBuffering stdout LineBuffering
>>> effect n = print n >> return n
>>> import Streamly.Data.Stream (Stream)
>>> import qualified Streamly.Data.Array as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.Data.Unfold as Unfold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.FileSystem.Dir as Dir

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.Data.Fold as Fold
>>> import qualified Streamly.Internal.Data.Parser as Parser
>>> import qualified Streamly.Internal.Data.Stream as Stream
>>> import qualified Streamly.Internal.Data.Unfold as Unfold
>>> import qualified Streamly.Internal.FileSystem.Dir as Dir

Overview

Streamly is a framework for modular data flow based programming and declarative concurrency. Powerful stream fusion framework in streamly allows high performance combinatorial programming even when using byte level streams. Streamly API is similar to Haskell lists.

Console Echo Example

In the following example, repeatM generates an infinite stream of String by repeatedly performing the getLine IO action. mapM then applies putStrLn on each element in the stream converting it to stream of (). Finally, drain folds the stream to IO discarding the () values, thus producing only effects.

>>> import Data.Function ((&))
>>> :{
echo =
 Stream.repeatM getLine       -- Stream IO String
     & Stream.mapM putStrLn   -- Stream IO ()
     & Stream.fold Fold.drain -- IO ()
:}

This is a console echo program. It is an example of a declarative loop written using streaming combinators. Compare it with an imperative while loop.

Hopefully, this gives you an idea how we can program declaratively by representing loops using streams. In this module, you can find all Data.List like functions and many more powerful combinators to perform common programming tasks.

Stream Fusion

The fused Stream type in this module employs stream fusion for C-like performance when looping over data. It represents the stream as a state machine using an explicit state, and a step function working on the state. A typical stream operation consumes elements from the previous state machine in a stream pipeline, transforms the elements and yields new values for the next stage to consume. The stream operations are modular and represent a single task, they have no knowledge of previous or next operation on the elements.

A typical stream pipeline consists of a stream producer, several stream transformation operations and a stream consumer. All these operations taken together form a closed loop processing the stream elements. Elements are transferred between stages using a boxed data constructor. However, all the stages of the pipeline are fused together by GHC, eliminating the boxing of intermediate constructors, and thus forming a tight C like loop without any boxed data being used in the loop.

Stream fusion works effectively when:

  • the stream pipeline is composed statically (known at compile time)
  • all the operations forming the loop are inlined
  • the loop is not recursively defined, recursion breaks inlining

If these conditions cannot be met, the CPS style stream type StreamK may turn out to be a better choice than the fused stream type Stream.

Stream vs StreamK

The fused stream model avoids constructor allocations and function call overheads. However, the stream is represented as a state machine, and to generate stream elements it has to navigate the decision tree of the state machine. Moreover, the state machine is cranked for each element in the stream. This performs extremely well when the number of states are limited. The state machine starts getting expensive as the number of states increase. For example, generating a stream from a list requires a single state and is very efficient, even if it has millions of elements. However, using cons to construct a million element stream would be a disaster.

A typical worst case scenario for fused stream model is a large number of cons or append operations. A few static cons or append operations are very fast and much faster than a CPS style stream because CPS involves a function call for each element whereas fused stream involves a few conditional branches in the state machine. However, constructing a large stream using cons introduces as many states in the state machine as the number of elements. If we compose cons as a balanced binary tree it will take n * log n time to navigate the tree, and n * n if it is a right associative composition.

Operations like cons or append; are typically recursively called to construct a lazy infinite stream. For such use cases the CPS style StreamK should be used. CPS streams do not have a state machine that needs to be cranked for each element, past state has no effect on the future element processing. However, CPS incurs a function call overhead for each element processed, the overhead could be large compared to a fused state machine even if it has many states. However, because of its linear performance characterstics, after a certain threshold of stream compositions the CPS stream would perform much better than the quadratic fused stream operations.

As a general guideline, you need to use StreamK when you have to use cons, append or other operations having quadratic complexity at a large scale. Typically, in such cases you need to compose the stream recursively, by calling an operation in a loop. The decision to compose the stream is taken at run time rather than statically at compile time.

Typically you would compose a StreamK of chunks of data so that the StreamK overhead is not high, and then process the chunks using Stream by using statically fused stream pipeline operations on the chunks.

Stream and StreamK types can be interconverted. See Streamly.Data.StreamK module for conversion operations.

The Stream Type

data Stream m a Source #

A stream consists of a step function that generates the next step given a current state, and the current state.

Instances

Instances details
(Foldable m, Monad m) => Foldable (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fold :: Monoid m0 => Stream m m0 -> m0 Source #

foldMap :: Monoid m0 => (a -> m0) -> Stream m a -> m0 Source #

foldMap' :: Monoid m0 => (a -> m0) -> Stream m a -> m0 Source #

foldr :: (a -> b -> b) -> b -> Stream m a -> b Source #

foldr' :: (a -> b -> b) -> b -> Stream m a -> b Source #

foldl :: (b -> a -> b) -> b -> Stream m a -> b Source #

foldl' :: (b -> a -> b) -> b -> Stream m a -> b Source #

foldr1 :: (a -> a -> a) -> Stream m a -> a Source #

foldl1 :: (a -> a -> a) -> Stream m a -> a Source #

toList :: Stream m a -> [a] Source #

null :: Stream m a -> Bool Source #

length :: Stream m a -> Int Source #

elem :: Eq a => a -> Stream m a -> Bool Source #

maximum :: Ord a => Stream m a -> a Source #

minimum :: Ord a => Stream m a -> a Source #

sum :: Num a => Stream m a -> a Source #

product :: Num a => Stream m a -> a Source #

Monad m => Functor (Stream m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Methods

fmap :: (a -> b) -> Stream m a -> Stream m b Source #

(<$) :: a -> Stream m b -> Stream m a Source #

a ~ Char => IsString (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

IsList (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Associated Types

type Item (Stream Identity a) Source #

Read a => Read (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Show a => Show (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Eq a => Eq (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

Ord a => Ord (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (Stream Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Type

type Item (Stream Identity a) = a

Construction

Functions ending in the general shape b -> Stream m a.

Useful Idioms:

>>> fromIndices f = fmap f $ Stream.enumerateFrom 0
>>> fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
>>> fromListM = Stream.sequence . Stream.fromList
>>> fromFoldable = StreamK.toStream . StreamK.fromFoldable
>>> fromFoldableM = Stream.sequence . fromFoldable

Primitives

A fused Stream is never constructed using these primitives, they are typically generated by converting containers like list into streams, or generated using custom functions provided in this module. The cons primitive in this module has a rare use in fusing a small number of elements. On the other hand, it is common to construct StreamK stream using the StreamK.cons primitive.

nil :: Applicative m => Stream m a Source #

A stream that terminates without producing any output or side effect.

>>> Stream.toList Stream.nil
[]

nilM :: Applicative m => m b -> Stream m a Source #

A stream that terminates without producing any output, but produces a side effect.

>>> Stream.fold Fold.toList (Stream.nilM (print "nil"))
"nil"
[]

Pre-release

cons :: Applicative m => a -> Stream m a -> Stream m a infixr 5 Source #

WARNING! O(n^2) time complexity wrt number of elements. Use the O(n) complexity StreamK.cons unless you want to statically fuse just a few elements.

Fuse a pure value at the head of an existing stream::

>>> s = 1 `Stream.cons` Stream.fromList [2,3]
>>> Stream.toList s
[1,2,3]

Definition:

>>> cons x xs = return x `Stream.consM` xs

consM :: Applicative m => m a -> Stream m a -> Stream m a infixr 5 Source #

Like cons but fuses an effect instead of a pure value.

Unfolding

unfoldrM is the most general way of generating a stream efficiently. All other generation operations can be expressed using it.

unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a Source #

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then Nothing
        else Just (b, b + 1)
in Stream.toList $ Stream.unfoldr f 0
:}
[0,1,2]

unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then return Nothing
        else return (Just (b, b + 1))
in Stream.toList $ Stream.unfoldrM f 0
:}
[0,1,2]

From Values

Generate a monadic stream from a seed value or values.

fromPure :: Applicative m => a -> Stream m a Source #

Create a singleton stream from a pure value.

>>> fromPure a = a `Stream.cons` Stream.nil
>>> fromPure = pure
>>> fromPure = Stream.fromEffect . pure

fromEffect :: Applicative m => m a -> Stream m a Source #

Create a singleton stream from a monadic action.

>>> fromEffect m = m `Stream.consM` Stream.nil
>>> fromEffect = Stream.sequence . Stream.fromPure
>>> Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")
hello

repeat :: Monad m => a -> Stream m a Source #

Generate an infinite stream by repeating a pure value.

>>> repeat x = Stream.repeatM (pure x)

repeatM :: Monad m => m a -> Stream m a Source #

>>> repeatM = Stream.sequence . Stream.repeat

Generate a stream by repeatedly executing a monadic action forever.

>>> :{
repeatAction =
       Stream.repeatM (threadDelay 1000000 >> print 1)
     & Stream.take 10
     & Stream.fold Fold.drain
:}

replicate :: Monad m => Int -> a -> Stream m a Source #

>>> replicate n = Stream.take n . Stream.repeat
>>> replicate n x = Stream.replicateM n (pure x)

Generate a stream of length n by repeating a value n times.

replicateM :: Monad m => Int -> m a -> Stream m a Source #

>>> replicateM n = Stream.sequence . Stream.replicate n

Generate a stream by performing a monadic action n times.

Enumeration

We can use the Enum type class to enumerate a type producing a list and then convert it to a stream:

fromList $ enumFromThen from then

However, this is not particularly efficient. The Enumerable type class provides corresponding functions that generate a stream instead of a list, efficiently.

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type class are equivalent to those in the Enum type class, except that these generate a stream instead of a list. Use the functions in Streamly.Internal.Data.Stream.Enumeration module to define new instances.

Methods

enumerateFrom :: Monad m => a -> Stream m a Source #

enumerateFrom from generates a stream starting with the element from, enumerating up to maxBound when the type is Bounded or generating an infinite stream when the type is not Bounded.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]

For Fractional types, enumeration is numerically stable. However, no overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]

enumerateFromTo :: Monad m => a -> a -> Stream m a Source #

Generate a finite stream starting with the element from, enumerating the type up to the value to. If to is smaller than from then an empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]

For Fractional types, the last element is equal to the specified to value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]
>>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

enumerateFromThen :: Monad m => a -> a -> Stream m a Source #

enumerateFromThen from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For Bounded types the stream ends when maxBound is reached, for unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]

enumerateFromThenTo :: Monad m => a -> a -> a -> Stream m a Source #

enumerateFromThenTo from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]
>>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]

Instances

Instances details
Enumerable Int16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int16 -> Stream m Int16 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Stream m Int16 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Stream m Int16 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Int16 -> Stream m Int16 Source #

Enumerable Int32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int32 -> Stream m Int32 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Stream m Int32 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Stream m Int32 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Int32 -> Stream m Int32 Source #

Enumerable Int64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int64 -> Stream m Int64 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Stream m Int64 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Stream m Int64 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Int64 -> Stream m Int64 Source #

Enumerable Int8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int8 -> Stream m Int8 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Stream m Int8 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Stream m Int8 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Int8 -> Stream m Int8 Source #

Enumerable Word16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word16 -> Stream m Word16 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Stream m Word16 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Stream m Word16 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Word16 -> Stream m Word16 Source #

Enumerable Word32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word32 -> Stream m Word32 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Stream m Word32 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Stream m Word32 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Word32 -> Stream m Word32 Source #

Enumerable Word64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word64 -> Stream m Word64 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Stream m Word64 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Stream m Word64 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Word64 -> Stream m Word64 Source #

Enumerable Ordering Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Ordering -> Stream m Ordering Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Stream m Ordering Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Stream m Ordering Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Ordering -> Stream m Ordering Source #

Enumerable Word8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word8 -> Stream m Word8 Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Stream m Word8 Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Stream m Word8 Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Word8 -> Stream m Word8 Source #

Enumerable Integer Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Integer -> Stream m Integer Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Stream m Integer Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Stream m Integer Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Integer -> Stream m Integer Source #

Enumerable Natural Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Natural -> Stream m Natural Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Stream m Natural Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Stream m Natural Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Natural -> Stream m Natural Source #

Enumerable () Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => () -> Stream m () Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => () -> () -> Stream m () Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => () -> () -> Stream m () Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => () -> () -> () -> Stream m () Source #

Enumerable Bool Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Bool -> Stream m Bool Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Stream m Bool Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Stream m Bool Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Bool -> Stream m Bool Source #

Enumerable Char Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Char -> Stream m Char Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Char -> Char -> Stream m Char Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Char -> Char -> Stream m Char Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Char -> Char -> Char -> Stream m Char Source #

Enumerable Double Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Double -> Stream m Double Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Double -> Double -> Stream m Double Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Double -> Double -> Stream m Double Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Double -> Double -> Double -> Stream m Double Source #

Enumerable Float Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Float -> Stream m Float Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Float -> Float -> Stream m Float Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Float -> Float -> Stream m Float Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Float -> Float -> Float -> Stream m Float Source #

Enumerable Int Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Int -> Stream m Int Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int -> Int -> Stream m Int Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int -> Int -> Stream m Int Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int -> Int -> Int -> Stream m Int Source #

Enumerable Word Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Word -> Stream m Word Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word -> Word -> Stream m Word Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word -> Word -> Stream m Word Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word -> Word -> Word -> Stream m Word Source #

Enumerable a => Enumerable (Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Identity a -> Stream m (Identity a) Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Stream m (Identity a) Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Stream m (Identity a) Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Identity a -> Stream m (Identity a) Source #

Integral a => Enumerable (Ratio a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Ratio a -> Stream m (Ratio a) Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Stream m (Ratio a) Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Stream m (Ratio a) Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Ratio a -> Stream m (Ratio a) Source #

HasResolution a => Enumerable (Fixed a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Generate

Methods

enumerateFrom :: forall (m :: Type -> Type). Monad m => Fixed a -> Stream m (Fixed a) Source #

enumerateFromTo :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Stream m (Fixed a) Source #

enumerateFromThen :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Stream m (Fixed a) Source #

enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Fixed a -> Stream m (Fixed a) Source #

enumerate :: (Monad m, Bounded a, Enumerable a) => Stream m a Source #

enumerate = enumerateFrom minBound

Enumerate a Bounded type from its minBound to maxBound

enumerateTo :: (Monad m, Bounded a, Enumerable a) => a -> Stream m a Source #

>>> enumerateTo = Stream.enumerateFromTo minBound

Enumerate a Bounded type from its minBound to specified value.

Iteration

iterate :: Monad m => (a -> a) -> a -> Stream m a Source #

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

>>> Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]

iterateM :: Monad m => (a -> m a) -> m a -> Stream m a Source #

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

>>> :{
Stream.iterateM (\x -> print x >> return (x + 1)) (return 0)
    & Stream.take 3
    & Stream.toList
:}
0
1
[0,1,2]

From Containers

Convert an input structure, container or source into a stream. All of these can be expressed in terms of primitives.

fromList :: Applicative m => [a] -> Stream m a Source #

Construct a stream from a list of pure values.

From Unfolds

Most of the above stream generation operations can also be expressed using the corresponding unfolds in the Streamly.Data.Unfold module.

unfold :: Applicative m => Unfold m a b -> a -> Stream m b Source #

Convert an Unfold into a stream by supplying it an input seed.

>>> s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")
>>> Stream.fold Fold.drain s
hello
hello
hello

Elimination

Functions ending in the general shape Stream m a -> m b or Stream m a -> m (b, Stream m a)

Primitives

uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

Properties:

>>> Nothing <- Stream.uncons Stream.nil
>>> Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)

This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of uncons, however, this is generally less efficient than specific folds because it takes apart the stream one element at a time, therefore, does not take adavantage of stream fusion.

foldBreak is a more general way of consuming a stream piecemeal.

>>> :{
uncons xs = do
    r <- Stream.foldBreak Fold.one xs
    return $ case r of
        (Nothing, _) -> Nothing
        (Just h, t) -> Just (h, t)
:}

Strict Left Folds

fold :: Monad m => Fold m a b -> Stream m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

Definitions:

>>> fold f = fmap fst . Stream.foldBreak f
>>> fold f = Stream.parse (Parser.fromFold f)

Example:

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a) Source #

Like fold but also returns the remaining stream. The resulting stream would be nil if the stream finished before the fold.

Parsing

parse :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b) Source #

Parse a stream using the supplied Parser.

Parsers (See Streamly.Internal.Data.Parser) are more powerful folds that add backtracking and error functionality to terminating folds. Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

>>> Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")

Note: parse p is not the same as head . parseMany p on an empty stream.

Lazy Right Folds

Consuming a stream to build a right associated expression, suitable for lazy evaluation. Evaluation of the input happens when the output of the fold is evaluated, the fold output is a lazy thunk.

This is suitable for stream transformation operations, for example, operations like mapping a function over the stream.

foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #

Right associative/lazy pull fold. foldrM build final stream constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build again thus lazily consuming the input stream until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.

Example, determine if any element is odd in a stream:

>>> s = Stream.fromList (2:4:5:undefined)
>>> step x xs = if odd x then return True else xs
>>> Stream.foldrM step (return False) s
True

foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

>>> foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)

Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.

Specific Folds

Usually you can use the folds in Streamly.Data.Fold. However, some folds that may be commonly used or may have an edge in performance in some cases are provided here.

Useful idioms:

>>> foldlM' f a = Stream.fold (Fold.foldlM' f a)
>>> foldl1 f = Stream.fold (Fold.foldl1' f)
>>> foldl' f a = Stream.fold (Fold.foldl' f a)
>>> drain = Stream.fold Fold.drain
>>> mapM_ f = Stream.fold (Fold.drainMapM f)
>>> length = Stream.fold Fold.length
>>> head = Stream.fold Fold.one

toList :: Monad m => Stream m a -> m [a] Source #

Definitions:

>>> toList = Stream.foldr (:) []
>>> toList = Stream.fold Fold.toList

Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.

Note that this could a bit more efficient compared to Stream.fold Fold.toList, and it can fuse with pure list consumers.

Mapping

Stateless one-to-one transformations. Use fmap for mapping a pure function on a stream.

sequence :: Monad m => Stream m (m a) -> Stream m a Source #

>>> sequence = Stream.mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

>>> s = Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
>>> Stream.fold Fold.drain $ Stream.sequence s
abc

mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #

>>> mapM f = Stream.sequence . fmap f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

>>> s = Stream.fromList ["a", "b", "c"]
>>> Stream.fold Fold.drain $ Stream.mapM putStr s
abc

trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

>>> s = Stream.enumerateFromTo 1 2
>>> Stream.fold Fold.drain $ Stream.trace print s
1
2

Compare with tap.

tap :: Monad m => Fold m a b -> Stream m a -> Stream m a Source #

Tap the data flowing through a stream into a Fold. For example, you may add a tap to log the contents flowing through the stream. The fold is used only for effects, its result is discarded.

                  Fold m a b
                      |
-----stream m a ---------------stream m a-----

>>> s = Stream.enumerateFromTo 1 2
>>> Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s
1
2

Compare with trace.

delay :: MonadIO m => Double -> Stream m a -> Stream m a Source #

Introduce a delay of specified seconds between elements of the stream.

Definition:

>>> sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>> delay = Stream.intersperseM_ . sleep

Example:

>>> input = Stream.enumerateFromTo 1 3
>>> Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input
1
2
3

Scanning

Stateful one-to-one transformations.

Scanning By Fold

Useful idioms:

>>> scanl' f z = Stream.scan (Fold.foldl' f z)
>>> scanlM' f z = Stream.scan (Fold.foldlM' f z)
>>> postscanl' f z = Stream.postscan (Fold.foldl' f z)
>>> postscanlM' f z = Stream.postscan (Fold.foldlM' f z)
>>> scanl1' f = Stream.catMaybes . Stream.scan (Fold.foldl1' f)
>>> scanl1M' f = Stream.catMaybes . Stream.scan (Fold.foldlM1' f)

scan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Strict left scan. Scan a stream using the given monadic fold.

>>> s = Stream.fromList [1..10]
>>> Stream.fold Fold.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum s
[0,1,3,6]

See also: usingStateT

postscan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Postscan a stream using the given monadic fold.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

>>> import Data.Maybe (fromJust)
>>> let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>> s = Stream.enumerateFromTo 1.0 100.0
>>> :{
 Stream.fold Fold.toList
  $ fmap (fromJust . fst)
  $ Stream.takeWhile (\(_,x) -> x <= 10)
  $ Stream.postscan (Fold.tee Fold.latest avg) s
:}
[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]

Specific scans

indexed :: Monad m => Stream m a -> Stream m (Int, a) Source #

>>> f = Fold.foldl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
>>> indexed = Stream.postscan f
>>> indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
>>> indexedR n = fmap (\(i, a) -> (n - i, a)) . indexed

Pair each element in a stream with its index, starting from index 0.

>>> Stream.fold Fold.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]

Insertion

Add elements to the stream.

insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #

insertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp.

>>> insertBy cmp x = Stream.mergeBy cmp (Stream.fromPure x)
>>> input = Stream.fromList [1,3,5]
>>> Stream.fold Fold.toList $ Stream.insertBy compare 2 input
[1,2,3,5]

intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

>>> input = Stream.fromList "hello"
>>> Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') input
h.,e.,l.,l.,o"h,e,l,l,o"

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

>>> Stream.fold Fold.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar input
he.l.l.o."h,e,l,l,o"

intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a Source #

Insert a side effect before consuming an element of a stream except the first one.

>>> input = Stream.fromList "hello"
>>> Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') input
h.e.l.l.o

Pre-release

intersperse :: Monad m => a -> Stream m a -> Stream m a Source #

Insert a pure value between successive elements of a stream.

>>> input = Stream.fromList "hello"
>>> Stream.fold Fold.toList $ Stream.intersperse ',' input
"h,e,l,l,o"

Filtering

Remove elements from the stream.

Stateless Filters

mapMaybeM is the most general stateless filtering operation. All other filtering operations can be expressed using it.

mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b Source #

Map a Maybe returning function to a stream, filter out the Nothing elements, and return a stream of values extracted from Just.

Equivalent to:

>>> mapMaybe f = Stream.catMaybes . fmap f

mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b Source #

Like mapMaybe but maps a monadic function.

Equivalent to:

>>> mapMaybeM f = Stream.catMaybes . Stream.mapM f
>>> mapM f = Stream.mapMaybeM (\x -> Just <$> f x)

filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

Include only those elements that pass a predicate.

>>> filter p = Stream.filterM (return . p)
>>> filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)
>>> filter p = Stream.scanMaybe (Fold.filtering p)

filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as filter but with a monadic predicate.

>>> f p x = p x >>= \r -> return $ if r then Just x else Nothing
>>> filterM p = Stream.mapMaybeM (f p)

catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a Source #

In a stream of Maybes, discard Nothings and unwrap Justs.

>>> catMaybes = Stream.mapMaybe id
>>> catMaybes = fmap fromJust . Stream.filter isJust

Pre-release

catLefts :: Monad m => Stream m (Either a b) -> Stream m a Source #

Discard Rights and unwrap Lefts in an Either stream.

>>> catLefts = fmap (fromLeft undefined) . Stream.filter isLeft

Pre-release

catRights :: Monad m => Stream m (Either a b) -> Stream m b Source #

Discard Lefts and unwrap Rights in an Either stream.

>>> catRights = fmap (fromRight undefined) . Stream.filter isRight

Pre-release

catEithers :: Monad m => Stream m (Either a a) -> Stream m a Source #

Remove the either wrapper and flatten both lefts and as well as rights in the output stream.

>>> catEithers = fmap (either id id)

Pre-release

Stateful Filters

scanMaybe is the most general stateful filtering operation. The filtering folds (folds returning a Maybe type) in Streamly.Internal.Data.Fold can be used along with scanMaybe to perform stateful filtering operations in general.

Useful idioms:

>>> deleteBy cmp x = Stream.scanMaybe (Fold.deleteBy cmp x)
>>> findIndices p = Stream.scanMaybe (Fold.findIndices p)
>>> elemIndices a = findIndices (== a)
>>> uniq = Stream.scanMaybe (Fold.uniqBy (==))

scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b Source #

Use a filtering fold on a stream.

>>> scanMaybe f = Stream.catMaybes . Stream.postscan f

take :: Applicative m => Int -> Stream m a -> Stream m a Source #

Take first n elements from the stream and discard the rest.

takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

End the stream as soon as the predicate fails on an element.

takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as takeWhile but with a monadic predicate.

drop :: Monad m => Int -> Stream m a -> Stream m a Source #

Discard first n elements from the stream and take the rest.

dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #

Same as dropWhile but with a monadic predicate.

Combining Two Streams

Note that these operations are suitable for statically fusing a few streams, they have a quadratic O(n^2) time complexity wrt to the number of streams. If you want to compose many streams dynamically using binary combining operations see the corresponding operations in Streamly.Data.StreamK.

When fusing more than two streams it is more efficient if the binary operations are composed as a balanced tree rather than a right associative or left associative one e.g.:

>>> s1 = Stream.fromList [1,2] `Stream.append` Stream.fromList [3,4]
>>> s2 = Stream.fromList [4,5] `Stream.append` Stream.fromList [6,7]
>>> s = s1 `Stream.append` s2

Appending

append :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.append otherwise.

Fuses two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> s1 = Stream.fromList [1,2]
>>> s2 = Stream.fromList [3,4]
>>> Stream.fold Fold.toList $ s1 `Stream.append` s2
[1,2,3,4]

Interleaving

interleave :: Monad m => Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.interleave otherwise.

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

Merging

mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.mergeBy otherwise.

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> s1 = Stream.fromList [1,3,5]
>>> s2 = Stream.fromList [2,4,6,8]
>>> Stream.fold Fold.toList $ Stream.mergeBy compare s1 s2
[1,2,3,4,5,6,8]

mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #

Like mergeBy but with a monadic comparison function.

Example, to merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT
> Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]

Example, merge two streams in a proportion of 2:1:

>>> :{
do
 let s1 = Stream.fromList [1,1,1,1,1,1]
     s2 = Stream.fromList [2,2,2]
 let proportionately m n = do
      ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
      return $ \_ _ -> do
         r <- readIORef ref
         writeIORef ref $ Prelude.tail r
         return $ Prelude.head r
 f <- proportionately 2 1
 xs <- Stream.fold Fold.toList $ Stream.mergeByM f s1 s2
 print xs
:}
[1,1,2,1,1,2,1,1,2]

Zipping

zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

WARNING! O(n^2) time complexity wrt number of streams. Suitable for statically fusing a small number of streams. Use the O(n) complexity StreamK.zipWith otherwise.

Stream a is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer.

If stream a or stream b ends, the zipped stream ends. If stream b ends first, the element a from previous evaluation of stream a is discarded.

>>> s1 = Stream.fromList [1,2,3]
>>> s2 = Stream.fromList [4,5,6]
>>> Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2
[5,7,9]

zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #

Like zipWith but using a monadic zipping function.

Cross Product

crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #

Definition:

>>> crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2

Note that the second stream is evaluated multiple times.

Unfold Each

unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #

unfoldMany unfold stream uses unfold to map the input stream elements to streams and then flattens the generated streams into a single output stream.

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

intercalate :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

intersperse followed by unfold and concat.

>>> intercalate u a = Stream.unfoldMany u . Stream.intersperse a
>>> intersperse = Stream.intercalate Unfold.identity
>>> unwords = Stream.intercalate Unfold.fromList " "
>>> input = Stream.fromList ["abc", "def", "ghi"]
>>> Stream.fold Fold.toList $ Stream.intercalate Unfold.fromList " " input
"abc def ghi"

intercalateSuffix :: Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #

intersperseMSuffix followed by unfold and concat.

>>> intercalateSuffix u a = Stream.unfoldMany u . Stream.intersperseMSuffix a
>>> intersperseMSuffix = Stream.intercalateSuffix Unfold.identity
>>> unlines = Stream.intercalateSuffix Unfold.fromList "\n"
>>> input = Stream.fromList ["abc", "def", "ghi"]
>>> Stream.fold Fold.toList $ Stream.intercalateSuffix Unfold.fromList "\n" input
"abc\ndef\nghi\n"

Stream of streams

Stream operations like map and filter represent loop processing in imperative programming terms. Similarly, the imperative concept of nested loops are represented by streams of streams. The concatMap operation represents nested looping. A concatMap operation loops over the input stream and then for each element of the input stream generates another stream and then loops over that inner stream as well producing effects and generating a single output stream.

One dimension loops are just a special case of nested loops. For example, concatMap can degenerate to a simple map operation:

map f m = S.concatMap (\x -> S.fromPure (f x)) m

Similarly, concatMap can perform filtering by mapping an element to a nil stream:

filter p m = S.concatMap (\x -> if p x then S.fromPure x else S.nil) m

concatEffect :: Monad m => m (Stream m a) -> Stream m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

>>> concatEffect = Stream.concat . Stream.fromEffect
>>> concatEffect eff = Stream.concatMapM (\() -> eff) (Stream.fromPure ())

See also: concat, sequence

concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

>>> concatMap f = Stream.concatMapM (return . f)
>>> concatMap f = Stream.concat . fmap f
>>> concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)

See unfoldMany for a fusible alternative.

concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

See unfoldMany for a fusible alternative.

Repeated Fold

Useful idioms:

>>> splitWithSuffix p f = Stream.foldMany (Fold.takeEndBy p f)
>>> splitOnSuffix p f = Stream.foldMany (Fold.takeEndBy_ p f)
>>> groupsBy eq f = Stream.parseMany (Parser.groupBy eq f)
>>> groupsByRolling eq f = Stream.parseMany (Parser.groupByRolling eq f)
>>> groupsOf n f = Stream.foldMany (Fold.take n f)

foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #

Apply a Fold repeatedly on a stream and emit the results in the output stream.

Definition:

>>> foldMany f = Stream.parseMany (Parser.fromFold f)

Example, empty stream:

>>> f = Fold.take 2 Fold.sum
>>> fmany = Stream.fold Fold.toList . Stream.foldMany f
>>> fmany $ Stream.fromList []
[]

Example, last fold empty:

>>> fmany $ Stream.fromList [1..4]
[3,7]

Example, last fold non-empty:

>>> fmany $ Stream.fromList [1..5]
[3,7,5]

Note that using a closed fold e.g. Fold.take 0, would result in an infinite stream on a non-empty input stream.

groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b Source #

Group the input stream into groups of n elements each and then fold each group using the provided fold function.

groupsOf n f = foldMany (FL.take n f)
>>> Stream.toList $ Stream.groupsOf 2 Fold.sum (Stream.enumerateFromTo 1 10)
[3,7,11,15,19]

This can be considered as an n-fold version of take where we apply take repeatedly on the leftover stream until the stream exhausts.

parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #

Apply a Parser repeatedly on a stream and emit the parsed values in the output stream.

Example:

>>> s = Stream.fromList [1..10]
>>> parser = Parser.takeBetween 0 2 Fold.sum
>>> Stream.fold Fold.toList $ Stream.parseMany parser s
[Right 3,Right 7,Right 11,Right 15,Right 19]

This is the streaming equivalent of the many parse combinator.

Known Issues: When the parser fails there is no way to get the remaining stream.

Splitting

splitOn :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split on an infixed separator element, dropping the separator. The supplied Fold is applied on the split segments. Splits the stream on separator elements determined by the supplied predicate, separator is considered as infixed between two segments:

>>> splitOn' p xs = Stream.fold Fold.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
>>> splitOn' (== '.') "a.b"
["a","b"]

An empty stream is folded to the default value of the fold:

>>> splitOn' (== '.') ""
[""]

If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:

>>> splitOn' (== '.') "."
["",""]
>>> splitOn' (== '.') ".a"
["","a"]
>>> splitOn' (== '.') "a."
["a",""]
>>> splitOn' (== '.') "a..b"
["a","","b"]

splitOn is an inverse of intercalating single element:

Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id

wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #

Split the stream after stripping leading, trailing, and repeated separators as per the fold supplied. Therefore, ".a..b." with . as the separator would be parsed as ["a","b"]. In other words, its like parsing words from whitespace separated text.

Buffered Operations

Operations that require buffering of the stream. Reverse is essentially a left fold followed by an unfold.

reverse :: Monad m => Stream m a -> Stream m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

Definition:

>>> reverse m = Stream.concatEffect $ Stream.fold Fold.toListRev m >>= return . Stream.fromList

Multi-Stream folds

Operations that consume multiple streams at the same time.

eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool Source #

Compare two streams for equality

cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #

Compare two streams lexicographically.

isPrefixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns True if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.

>>> Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: Stream IO Char)
True

isSubsequenceOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #

Returns True if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.

>>> Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: Stream IO Char)
True

stripPrefix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #

stripPrefix prefix input strips the prefix stream from the input stream if it is a prefix of input. Returns Nothing if the input does not start with the given prefix, stripped input otherwise. Returns Just nil when the prefix is the same as the input stream.

Space: O(1)

Exceptions

Note that the stream exception handling routines catch and handle exceptions only in the stream generation steps and not in the consumer of the stream. For example, if we are folding or parsing a stream - any exceptions in the fold or parse steps won't be observed by the stream exception handlers. Exceptions in the fold or parse steps can be handled using the fold or parse exception handling routines. You can wrap the stream elimination function in the monad exception handler to observe exceptions in the stream as well as the consumer.

Most of these combinators inhibit stream fusion, therefore, when possible, they should be called in an outer loop to mitigate the cost. For example, instead of calling them on a stream of chars call them on a stream of arrays before flattening it to a stream of chars.

onException :: MonadCatch m => m b -> Stream m a -> Stream m a Source #

Run the action m b if the stream evaluation is aborted due to an exception. The exception is not caught, simply rethrown.

Observes exceptions only in the stream generation, and not in stream consumers.

Inhibits stream fusion

handle :: (MonadCatch m, Exception e) => (e -> m (Stream m a)) -> Stream m a -> Stream m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument. The exception is caught and handled unless the handler decides to rethrow it. Note that exception handling is not applied to the stream returned by the exception handler.

Observes exceptions only in the stream generation, and not in stream consumers.

Inhibits stream fusion

Resource Management

bracket is the most general resource management operation, all other operations can be expressed using it. These functions have IO suffix because the allocation and cleanup functions are IO actions. For generalized allocation and cleanup functions, see the functions without the IO suffix in the "streamly" package.

Note that these operations bracket the stream generation only, they do not cover the stream consumer. This means if an exception occurs in the consumer of the stream (e.g. in a fold or parse step) then the exception won't be observed by the stream resource handlers, in that case the resource cleanup handler runs when the stream is garbage collected.

Monad level resource management can always be used around the stream elimination functions, such a function can observe exceptions in both the stream and its consumer.

before :: Monad m => m b -> Stream m a -> Stream m a Source #

Run the action m b before the stream yields its first element.

Same as the following but more efficient due to fusion:

>>> before action xs = Stream.nilM action <> xs
>>> before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)

afterIO :: MonadIO m => IO b -> Stream m a -> Stream m a Source #

Run the action IO b whenever the stream is evaluated to completion, or if it is garbage collected after a partial lazy evaluation.

The semantics of the action IO b are similar to the semantics of cleanup action in bracketIO.

See also afterUnsafe

finallyIO :: (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a Source #

Run the action IO b whenever the stream stream stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.

The semantics of running the action IO b are similar to the cleanup action semantics described in bracketIO.

>>> finallyIO release = Stream.bracketIO (return ()) (const release)

See also finallyUnsafe

Inhibits stream fusion

bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a Source #

Run the alloc action IO b with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output b of the IO action as input to the function b -> Stream m a to generate an output stream.

b is usually a resource under the IO monad, e.g. a file handle, that requires a cleanup after use. The cleanup action b -> IO c, runs whenever (1) the stream ends normally, (2) due to a sync or async exception or, (3) if it gets garbage collected after a partial lazy evaluation. The exception is not caught, it is rethrown.

bracketIO only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run. An example where GC based cleanup happens is when a stream is being folded but the fold terminates without draining the entire stream or if the consumer of the stream encounters an exception.

Observes exceptions only in the stream generation, and not in stream consumers.

See also: bracketUnsafe

Inhibits stream fusion

bracketIO3 :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> IO d) -> (b -> IO e) -> (b -> Stream m a) -> Stream m a Source #

Like bracketIO but can use 3 separate cleanup actions depending on the mode of termination:

  1. When the stream stops normally
  2. When the stream is garbage collected
  3. When the stream encounters an exception

bracketIO3 before onStop onGC onException action runs action using the result of before. If the stream stops, onStop action is executed, if the stream is abandoned onGC is executed, if the stream encounters an exception onException is executed.

The exception is not caught, it is rethrown.

Inhibits stream fusion

Pre-release

Transforming Inner Monad

morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a Source #

Transform the inner monad of a stream using a natural transformation.

Example, generalize the inner monad from Identity to any other:

>>> generalizeInner = Stream.morphInner (return . runIdentity)

Also known as hoist.

liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a Source #

Lift the inner monad m of Stream m a to t m where t is a monad transformer.

runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a Source #

Evaluate the inner monad of a stream as ReaderT.

runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a) Source #

Evaluate the inner monad of a stream as StateT and emit the resulting state and value pair after each step.

Stream of Arrays

chunksOf :: forall m a. (MonadIO m, Unbox a) => Int -> Stream m a -> Stream m (Array a) Source #

chunksOf n stream groups the elements in the input stream into arrays of n elements each.

Same as the following but may be more efficient:

>>> chunksOf n = Stream.foldMany (Array.writeN n)

Pre-release