streamly-0.7.3: Beautiful Streaming, Concurrent and Reactive Composition
Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Prelude

Description

This is an Internal module consisting of released, unreleased and unimplemented APIs. For stable and released APIs please see Streamly.Prelude module. This module provides documentation only for the unreleased and unimplemented APIs. For documentation on released APIs please see Streamly.Prelude module.

Synopsis

Construction

Primitives

nil :: IsStream t => t m a Source #

An empty stream.

> toList nil
[]

Since: 0.1.0

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil"))
"nil"
[]

Internal

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

(.:) :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use parallely to construct infinite streams)

Since: 0.2.0

(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ serially  $ delay |: delay |: delay |: nil
drain $ parallely $ delay |: delay |: delay |: nil

Concurrent (do not use parallely to construct infinite streams)

Since: 0.2.0

From Values

yield :: IsStream t => a -> t m a Source #

yield a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

yield = pure
yield = yieldM . pure

In Zip applicative streams yield is not the same as pure because in that case pure is equivalent to repeat instead. yield and pure are equally efficient, in other cases yield may be slightly more efficient than the other equivalent definitions.

Since: 0.4.0

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

yieldM m = m `consM` nil

Create a singleton stream from a monadic action.

> toList $ yieldM getLine
hello
["hello"]

Since: 0.4.0

repeat :: (IsStream t, Monad m) => a -> t m a Source #

Generate an infinite stream by repeating a pure value.

Since: 0.4.0

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

repeatM = fix . consM
repeatM = cycle1 . yieldM

Generate a stream by repeatedly executing a monadic action forever.

drain $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
drain $ asyncly  $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)

Concurrent, infinite (do not use with parallely)

Since: 0.2.0

replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #

replicate = take n . repeat

Generate a stream of length n by repeating a value n times.

Since: 0.6.0

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

replicateM = take n . repeatM

Generate a stream by performing a monadic action n times. Same as:

drain $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
drain $ asyncly  $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)

Concurrent

Since: 0.1.1

Enumeration

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type class are equivalent to those in the Enum type class, except that these generate a stream instead of a list. Use the functions in Streamly.Internal.Data.Stream.Enumeration module to define new instances.

Since: 0.6.0

Methods

enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #

enumerateFrom from generates a stream starting with the element from, enumerating up to maxBound when the type is Bounded or generating an infinite stream when the type is not Bounded.

> S.toList $ S.take 4 $ S.enumerateFrom (0 :: Int)
[0,1,2,3]

For Fractional types, enumeration is numerically stable. However, no overflow or underflow checks are performed.

> S.toList $ S.take 4 $ S.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]

Since: 0.6.0

enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element from, enumerating the type up to the value to. If to is smaller than from then an empty stream is returned.

> S.toList $ S.enumerateFromTo 0 4
[0,1,2,3,4]

For Fractional types, the last element is equal to the specified to value after rounding to the nearest integral value.

> S.toList $ S.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]
> S.toList $ S.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

Since: 0.6.0

enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #

enumerateFromThen from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For Bounded types the stream ends when maxBound is reached, for unbounded types it keeps enumerating infinitely.

> S.toList $ S.take 4 $ S.enumerateFromThen 0 2
[0,2,4,6]
> S.toList $ S.take 4 $ S.enumerateFromThen 0 (-2)
[0,-2,-4,-6]

Since: 0.6.0

enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #

enumerateFromThenTo from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from.

> S.toList $ S.enumerateFromThenTo 0 2 6
[0,2,4,6]
> S.toList $ S.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]

Since: 0.6.0

Instances

Instances details
Enumerable Bool Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> t m Bool Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> Bool -> t m Bool Source #

Enumerable Char Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> t m Char Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> Char -> t m Char Source #

Enumerable Double Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> t m Double Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> Double -> t m Double Source #

Enumerable Float Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> t m Float Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> Float -> t m Float Source #

Enumerable Int Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> t m Int Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> Int -> t m Int Source #

Enumerable Int8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> t m Int8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> Int8 -> t m Int8 Source #

Enumerable Int16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> t m Int16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> Int16 -> t m Int16 Source #

Enumerable Int32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> t m Int32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> Int32 -> t m Int32 Source #

Enumerable Int64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> t m Int64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> Int64 -> t m Int64 Source #

Enumerable Integer Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> t m Integer Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> Integer -> t m Integer Source #

Enumerable Natural Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> t m Natural Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> Natural -> t m Natural Source #

Enumerable Ordering Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> t m Ordering Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> Ordering -> t m Ordering Source #

Enumerable Word Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> t m Word Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> Word -> t m Word Source #

Enumerable Word8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> t m Word8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> Word8 -> t m Word8 Source #

Enumerable Word16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> t m Word16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> Word16 -> t m Word16 Source #

Enumerable Word32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> t m Word32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> Word32 -> t m Word32 Source #

Enumerable Word64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> t m Word64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> Word64 -> t m Word64 Source #

Enumerable () Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> t m () Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> () -> t m () Source #

Integral a => Enumerable (Ratio a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> t m (Ratio a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> Ratio a -> t m (Ratio a) Source #

Enumerable a => Enumerable (Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> t m (Identity a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> Identity a -> t m (Identity a) Source #

HasResolution a => Enumerable (Fixed a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> t m (Fixed a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a Source #

enumerate = enumerateFrom minBound

Enumerate a Bounded type from its minBound to maxBound

Since: 0.6.0

enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

enumerateTo = enumerateFromTo minBound

Enumerate a Bounded type from its minBound to specified value.

Since: 0.6.0

From Generators

unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #

unfoldr step s =
    case step s of
        Nothing -> nil
        Just (a, b) -> a `cons` unfoldr step b

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

let f b =
        if b > 3
        then Nothing
        else Just (b, b + 1)
in toList $ unfoldr f 0
[0,1,2,3]

Since: 0.1.0

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

let f b =
        if b > 3
        then return Nothing
        else print b >> return (Just (b, b + 1))
in drain $ unfoldrM f 0
 0
 1
 2
 3

When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.

(asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0)
    & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()

Concurrent

Since: 0.1.0

unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #

Convert an Unfold into a stream by supplying it an input seed.

>>> unfold (UF.replicateM 10) (putStrLn "hello")

Since: 0.7.0

iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #

iterate f x = x `cons` iterate f x

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

> S.toList $ S.take 5 $ S.iterate (+1) 1
[1,2,3,4,5]

Since: 0.1.2

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

iterateM f m = m >>= a -> return a `consM` iterateM f (f a)

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.

drain $ serially $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)

drain $ asyncly  $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)

Concurrent

Since: 0.7.0 (signature change)

Since: 0.1.2

fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #

fromIndices f = let g i = f i `cons` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a function f applied on the corresponding index. Index starts at 0.

> S.toList $ S.take 5 $ S.fromIndices id
[0,1,2,3,4]

Since: 0.6.0

fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a monadic function f applied on the corresponding index. Index starts at 0.

Concurrent

Since: 0.6.0

From Containers

fromList :: (Monad m, IsStream t) => [a] -> t m a Source #

fromList = foldr cons nil

Construct a stream from a list of pure values. This is more efficient than fromFoldable for serial streams.

Since: 0.4.0

fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #

fromListM = foldr consM nil

Construct a stream from a list of monadic actions. This is more efficient than fromFoldableM for serial streams.

Since: 0.4.0

fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #

fromFoldable = foldr cons nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #

fromFoldableM = foldr consM nil

Construct a stream from a Foldable containing monadic actions.

drain $ serially $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)
drain $ asyncly  $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)

Concurrent (do not use with parallely on infinite containers)

Since: 0.3.0

fromPrimVar :: (IsStream t, MonadIO m, Prim a) => Var IO a -> t m a Source #

Construct a stream by reading a Prim Var repeatedly.

Internal

fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #

Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.

Internal

Time related

currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime Source #

currentTime g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage.

Note: This API is not safe on 32-bit machines.

Internal

Elimination

Deconstruction

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

This is a brute force primitive. Avoid using it as long as possible, use it when no other combinator can do the job. This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

Since: 0.1.0

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

tail = fmap (fmap snd) . uncons

Extract all but the first element of the stream, if any.

Since: 0.1.1

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

Since: 0.5.0

Folding

Right Folds

foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #

Right associative/lazy pull fold. foldrM build final stream constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build again thus lazily consuming the input stream until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.

Example, determine if any element is odd in a stream:

>>> S.foldrM (\x xs -> if odd x then return True else xs) (return False) $ S.fromList (2:4:5:undefined)
> True

Since: 0.7.0 (signature changed)

Since: 0.2.0 (signature changed)

Since: 0.1.0

foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Right fold to a streaming monad.

foldrS S.cons S.nil === id

foldrS can be used to perform stateless stream to stream transformations like map and filter in general. It can be coupled with a scan to perform stateful transformations. However, note that the custom map and filter routines can be much more efficient than this due to better stream fusion.

>>> S.toList $ S.foldrS S.cons S.nil $ S.fromList [1..5]
> [1,2,3,4,5]

Find if any element in the stream is True:

>>> S.toList $ S.foldrS (\x xs -> if odd x then return True else xs) (return False) $ (S.fromList (2:4:5:undefined) :: SerialT IO Int)
> [True]

Map (+2) on odd elements and filter out the even elements:

>>> S.toList $ S.foldrS (\x xs -> if odd x then (x + 2) `S.cons` xs else xs) S.nil $ (S.fromList [1..5] :: SerialT IO Int)
> [3,5,7]

foldrM can also be represented in terms of foldrS, however, the former is much more efficient:

foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

Internal

foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #

Right fold to a transformer monad. This is the most general right fold function. foldrS is a special case of foldrT, however foldrS implementation can be more efficient:

foldrS = foldrT
foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

foldrT can be used to translate streamly streams to other transformer monads e.g. to a different streaming type.

Internal

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

Since: 0.1.0

Left Folds

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Left associative/strict push fold. foldl' reduce initial stream invokes reduce with the accumulator and the next input in the input stream, using initial as the initial value of the current value of the accumulator. When the input is exhausted the current value of the accumulator is returned. Make sure to use a strict data structure for accumulator to not build unnecessary lazy expressions unless that's what you want. See the previous section for more details.

Since: 0.2.0

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b Source #

Like foldl' but with a monadic step function.

Since: 0.2.0

Composable Left Folds

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left fold.

>>> S.fold FL.sum (S.enumerateFromTo 1 100)
5050

Since: 0.7.0

parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #

Parse a stream using the supplied Parse.

Internal

Concurrent Folds

foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #

Same as |$..

Internal

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function t m a -> m b to a stream t m a concurrently; The the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the folding action runs in another thread consuming the input from the buffer.

If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look at it as a transformation that converts a fold function to a buffered concurrent fold function.

The . at the end of the operator is a mnemonic for termination of the stream.

   S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
      |$. S.repeatM (threadDelay 1000000 >> return 1)

Concurrent

Since: 0.3.0

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

Parallel reverse function application operator for applying a run or fold functions to a stream. Just like |$. except that the operands are reversed.

       S.repeatM (threadDelay 1000000 >> return 1)
   |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()

Concurrent

Since: 0.3.0

Full Folds

drain :: Monad m => SerialT m a -> m () Source #

drain = mapM_ (\_ -> return ())

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example drain . asyncly.

Since: 0.7.0

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

last xs = xs !! (length xs - 1)

Since: 0.1.1

length :: Monad m => SerialT m a -> m Int Source #

Determine the length of the stream.

Since: 0.1.0

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers. Returns 0 when the stream is empty. Note that this is not numerically stable for floating point numbers.

Since: 0.1.0

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers. Returns 1 when the stream is empty.

Since: 0.1.1

mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #

Fold a stream of monoid elements by appending them.

Internal

maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

Since: 0.6.0

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

maximum = maximumBy compare

Determine the maximum element in a stream.

Since: 0.1.0

minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

Since: 0.6.0

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

minimum = minimumBy compare

Determine the minimum element in a stream.

Since: 0.1.0

the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #

Ensures that all the elements of the stream are identical and then returns that unique element.

Since: 0.6.0

Lazy Folds

toList :: Monad m => SerialT m a -> m [a] Source #

toList = S.foldr (:) []

Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Since: 0.1.0

toListRev :: Monad m => SerialT m a -> m [a] Source #

toListRev = S.foldl' (flip (:)) []

Convert a stream into a list in reverse order in the underlying monad.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Internal

toPure :: Monad m => SerialT m a -> m (SerialT Identity a) Source #

Convert a stream to a pure stream.

toPure = foldr cons nil

Internal

toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a) Source #

Convert a stream to a pure stream in reverse order.

toPureRev = foldl' (flip cons) nil

Internal

Composable Left Folds

toStream :: Monad m => Fold m a (SerialT Identity a) Source #

A fold that buffers its input to a pure stream.

Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Internal

toStreamRev :: Monad m => Fold m a (SerialT Identity a) Source #

Buffers the input stream to a pure stream in the reverse order of the input.

Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Internal

Partial Folds

drainN :: Monad m => Int -> SerialT m a -> m () Source #

drainN n = drain . take n

Run maximum up to n iterations of a stream.

Since: 0.7.0

drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

drainWhile p = drain . takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.7.0

(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #

Lookup the element at the given index.

Since: 0.6.0

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

head = (!! 0)

Since: 0.1.0

headElse :: Monad m => a -> SerialT m a -> m a Source #

Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.

Internal

findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element that satisfies the given predicate.

Since: 0.6.0

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Like findM but with a non-monadic predicate.

find p = findM (return . p)

Since: 0.5.0

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

In a stream of (key-value) pairs (a, b), return the value b of the first pair where the key equals the given value a.

lookup = snd <$> find ((==) . fst)

Since: 0.5.0

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

Since: 0.5.0

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

elemIndex a = findIndex (== a)

Since: 0.5.0

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

Since: 0.1.1

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

Since: 0.1.0

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

Since: 0.1.0

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

Since: 0.1.0

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

Since: 0.1.0

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

Since: 0.5.0

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines whether at least one element of a boolean stream is True.

Since: 0.5.0

Multi-Stream folds

eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #

Compare two streams for equality using an equality function.

Since: 0.6.0

cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #

Compare two streams lexicographically using a comparison function.

Since: 0.6.0

isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.

> S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is a suffix of the second. A stream is considered a suffix of itself.

> S.isSuffixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
True

Space: O(n), buffers entire input stream and the suffix.

Internal

Suboptimal - Help wanted.

isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is an infix of the second. A stream is considered an infix of itself.

> S.isInfixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
True

Space: O(n) worst case where n is the length of the infix.

Internal

Requires Storable constraint - Help wanted.

isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.

> S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #

Strip prefix if present and tell whether it was stripped or not. Returns Nothing if the stream does not start with the given prefix, stripped stream otherwise. Returns Just nil when the prefix is the same as the stream.

Space: O(1)

Since: 0.6.0

stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #

Drops the given suffix from a stream. Returns Nothing if the stream does not end with the given suffix. Returns Just nil when the suffix is the same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

Space: O(n), buffers the entire input stream as well as the suffix

Internal

dropPrefix :: t m a -> t m a -> t m a Source #

Drop prefix from the input stream if present.

Space: O(1)

Unimplemented - Help wanted.

dropInfix :: t m a -> t m a -> t m a Source #

Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.

Space: O(n) where n is the length of the infix.

Unimplemented - Help wanted.

dropSuffix :: t m a -> t m a -> t m a Source #

Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.

Space: O(n) where n is the length of the suffix.

Unimplemented - Help wanted.

Transformation

transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #

Use a Pipe to transform a stream.

Internal

Mapping

map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b Source #

map = fmap

Same as fmap.

> S.toList $ S.map (+1) $ S.fromList [1,2,3]
[2,3,4]

Since: 0.4.0

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

sequence = mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

> drain $ S.sequence $ S.fromList [putStr "a", putStr "b", putStrLn "c"]
abc

drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
          & (serially . S.sequence)

drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
          & (asyncly . S.sequence)

Concurrent (do not use with parallely on infinite streams)

Since: 0.1.0

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapM f = sequence . map f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

> drain $ S.mapM putStr $ S.fromList ["a", "b", "c"]
abc

drain $ S.replicateM 10 (return 1)
          & (serially . S.mapM (\x -> threadDelay 1000000 >> print x))

drain $ S.replicateM 10 (return 1)
          & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x))

Concurrent (do not use with parallely on infinite streams)

Since: 0.1.0

Special Maps

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

mapM_ = drain . mapM

Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.

Since: 0.1.0

trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

> S.drain $ S.trace print (S.enumerateFromTo 1 2)
1
2

Compare with tap.

Since: 0.7.0

tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #

Tap the data flowing through a stream into a Fold. For example, you may add a tap to log the contents flowing through the stream. The fold is used only for effects, its result is discarded.

                  Fold m a b
                      |
-----stream m a ---------------stream m a-----

> S.drain $ S.tap (FL.drainBy print) (S.enumerateFromTo 1 2)
1
2

Compare with trace.

Since: 0.7.0

tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #

tapOffsetEvery offset n taps every nth element in the stream starting at offset. offset can be between 0 and n - 1. Offset 0 means start at the first element in the stream. If the offset is outside this range then offset mod n is used as offset.

>>> S.drain $ S.tapOffsetEvery 0 2 (FL.mapM print FL.toList) $ S.enumerateFromTo 0 10
> [0,2,4,6,8,10]

Internal

tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #

Redirect a copy of the stream to a supplied fold and run it concurrently in an independent thread. The fold may buffer some elements. The buffer size is determined by the prevailing maxBuffer setting.

              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----

> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2)
1
2

Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.

Compare with tap.

Internal

tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a Source #

Calls the supplied function with the number of elements consumed every n seconds. The given function is run in a separate thread until the end of the stream. In case there is an exception in the stream the thread is killed during the next major GC.

Note: The action is not guaranteed to run if the main thread exits.

> delay n = threadDelay (round $ n * 1000000) >> return n
> S.drain $ S.tapRate 2 (\n -> print $ show n ++ " elements processed") (delay 1 S.|: delay 0.5 S.|: delay 0.5 S.|: S.nil)
2 elements processed
1 elements processed

Note: This may not work correctly on 32-bit machines.

Internal

pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a Source #

pollCounts predicate transform fold stream counts those elements in the stream that pass the predicate. The resulting count stream is sent to another thread which transforms it using transform and then folds it using fold. The thread is automatically cleaned up if the stream stops or aborts due to exception.

For example, to print the count of elements processed every second:

> S.drain $ S.pollCounts (const True) (S.rollingMap (-) . S.delayPost 1) (FL.drainBy print)
          $ S.enumerateFrom 0

Note: This may not work correctly on 32-bit machines.

Internal

Scanning

Left scans

scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Strict left scan. Like map, scanl' too is a one to one transformation, however it adds an extra element.

> S.toList $ S.scanl' (+) 0 $ fromList [1,2,3,4]
[0,1,3,6,10]
> S.toList $ S.scanl' (flip (:)) [] $ S.fromList [1,2,3,4]
[[],[1],[2,1],[3,2,1],[4,3,2,1]]

The output of scanl' is the initial value of the accumulator followed by all the intermediate steps and the final result of foldl'.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product of the elements in a stream in one go using a foldl':

> S.foldl' (\(s, p) x -> (s + x, p * x)) (0,1) $ S.fromList [1,2,3,4]
(10,24)

Using scanl' we can make it modular by computing the sum in the first stage and passing it down to the next stage for computing the product:

>   S.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1)
  $ S.scanl' (\(s, _) x -> (s + x, x)) (0,1)
  $ S.fromList [1,2,3,4]
(10,24)

IMPORTANT: scanl' evaluates the accumulator to WHNF. To avoid building lazy expressions inside the accumulator, it is recommended that a strict data structure is used for accumulator.

Since: 0.2.0

scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b Source #

Like scanl' but with a monadic fold function.

Since: 0.4.0

postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the initial value of the accumulator.

postscanl' f z xs = S.drop 1 $ S.scanl' f z xs

Since: 0.7.0

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function.

Since: 0.7.0

prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the final value of the accumulator.

Internal

prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function.

Internal

scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #

Like scanl' but for a non-empty stream. The first element of the stream is used as the initial value of the accumulator. Does nothing if the stream is empty.

> S.toList $ S.scanl1 (+) $ fromList [1,2,3,4]
[1,3,6,10]

Since: 0.6.0

scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #

Like scanl1' but with a monadic step function.

Since: 0.6.0

Scan Using Fold

scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Scan a stream using the given monadic fold.

Since: 0.7.0

postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Postscan a stream using the given monadic fold.

Since: 0.7.0

Concurrent Transformation

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Internal

applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #

Same as |$.

Internal

(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel transform application operator; applies a stream transformation function t m a -> t m b to a stream t m a concurrently; the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the transformation function runs in another thread consuming the input from the buffer. |$ is just like regular function application operator $ except that it is concurrent.

If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can look at it as a transformation that converts a transform function to a buffered concurrent transform function.

The following code prints a value every second even though each stage adds a 1 second delay.

drain $
   S.mapM (\x -> threadDelay 1000000 >> print x)
     |$ S.repeatM (threadDelay 1000000 >> return 1)

Concurrent

Since: 0.3.0

(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

Parallel reverse function application operator for streams; just like the regular reverse function application operator & except that it is concurrent.

drain $
      S.repeatM (threadDelay 1000000 >> return 1)
   |& S.mapM (\x -> threadDelay 1000000 >> print x)

Concurrent

Since: 0.3.0

Filtering

filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Include only those elements that pass a predicate.

Since: 0.1.0

filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as filter but with a monadic predicate.

Since: 0.4.0

Mapping Filters

mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b Source #

Map a Maybe returning function to a stream, filter out the Nothing elements, and return a stream of values extracted from Just.

Equivalent to:

mapMaybe f = S.map fromJust . S.filter isJust . S.map f

Since: 0.3.0

mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #

Like mapMaybe but maps a monadic function.

Equivalent to:

mapMaybeM f = S.map fromJust . S.filter isJust . S.mapM f

Concurrent (do not use with parallely on infinite streams)

Since: 0.3.0

Deleting Elements

deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

> S.toList $ S.deleteBy (==) 3 $ S.fromList [1,3,3,5]
[1,3,5]

Since: 0.6.0

uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #

Drop repeated elements that are adjacent to each other.

Since: 0.6.0

Inserting Elements

insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

insertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp.

insertBy cmp x = mergeBy cmp (yield x)
> S.toList $ S.insertBy compare 2 $ S.fromList [1,3,5]
[1,2,3,5]

Since: 0.6.0

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Generate a stream by inserting the result of a monadic action between consecutive elements of the given stream. Note that the monadic action is performed after the stream action before which its result is inserted.

> S.toList $ S.intersperseM (return ',') $ S.fromList "hello"
"h,e,l,l,o"

Since: 0.5.0

intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

Generate a stream by inserting a given element between consecutive elements of the given stream.

> S.toList $ S.intersperse ',' $ S.fromList "hello"
"h,e,l,l,o"

Since: 0.7.0

intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert a monadic action after each element in the stream.

Internal

intersperseSuffixBySpan :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -> t m a Source #

Like intersperseSuffix but intersperses a monadic action into the input stream after every n elements and after the last element.

> S.toList $ S.intersperseSuffixBySpan 2 (return ',') $ S.fromList "hello"
"he,ll,o,"

Internal

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n seconds.

> S.drain $ S.interjectSuffix 1 (putChar ',') $ S.mapM (\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello"
"h,e,l,l,o"

Internal

delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduces a delay of specified seconds after each element of a stream.

Internal

Indexing

indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #

indexed = S.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
indexed = S.zipWith (,) (S.enumerateFrom 0)

Pair each element in a stream with its index, starting from index 0.

> S.toList $ S.indexed $ S.fromList "hello"
[(0,h),(1,e),(2,l),(3,l),(4,o)]

Since: 0.6.0

indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #

indexedR n = S.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
indexedR n = S.zipWith (,) (S.enumerateFromThen n (n - 1))

Pair each element in a stream with its index, starting from the given index n and counting down.

> S.toList $ S.indexedR 10 $ S.fromList "hello"
[(10,h),(9,e),(8,l),(7,l),(6,o)]

Since: 0.6.0

Reordering

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

Since 0.7.0 (Monad m constraint)

Since: 0.1.1

reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a Source #

Like reverse but several times faster, requires a Storable instance.

Internal

Parsing

splitParse :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b Source #

Apply a Parse repeatedly on a stream and emit the parsed values in the output stream.

>>> S.toList $ S.splitParse (PR.take 2 $ PR.fromFold FL.sum) $ S.fromList [1..10]
> [3,7,11,15,19]
>>> S.toList $ S.splitParse (PR.line FL.toList) $ S.fromList "hello\nworld"
> ["hello\n","world"]

Trimming

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: 0.1.0

takeByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #

takeByTime duration yields stream elements upto specified time duration. The duration starts when the stream is evaluated for the first time, before the first element is yielded. The time duration is checked before generating each element, if the duration has expired the stream stops.

The total time taken in executing the stream is guaranteed to be at least duration, however, because the duration is checked before generating an element, the upper bound is indeterminate and depends on the time taken in generating and processing the last element.

No element is yielded if the duration is zero. At least one element is yielded if the duration is non-zero.

Internal

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: 0.1.0

takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as takeWhile but with a monadic predicate.

Since: 0.4.0

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: 0.1.0

dropByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #

dropByTime duration drops stream elements until specified duration has passed. The duration begins when the stream is evaluated for the first time. The time duration is checked after generating a stream element, the element is yielded if the duration has expired otherwise it is dropped.

The time elapsed before starting to generate the first element is at most duration, however, because the duration expiry is checked after the element is generated, the lower bound is indeterminate and depends on the time taken in generating an element.

All elements are yielded if the duration is zero.

Internal

dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

Since: 0.1.0

dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as dropWhile but with a monadic predicate.

Since: 0.4.0

Breaking

chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #

Group the input stream into groups of n elements each and then fold each group using the provided fold function.

> S.toList $ S.chunksOf 2 FL.sum (S.enumerateFromTo 1 10)
 [3,7,11,15,19]

This can be considered as an n-fold version of ltake where we apply ltake repeatedly on the leftover stream until the stream exhausts.

Since: 0.7.0

chunksOf2 :: (IsStream t, Monad m) => Int -> m c -> Fold2 m c a b -> t m a -> t m b Source #

Internal

arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a) Source #

arraysOf n stream groups the elements in the input stream into arrays of n elements each.

Same as the following but may be more efficient:

arraysOf n = S.chunksOf n (A.writeN n)

Internal

intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #

Group the input stream into windows of n second each and then fold each group using the provided fold function.

Since: 0.7.0

Searching

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

Since: 0.5.0

elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

Since: 0.5.0

Splitting

Streams can be sliced into segments in space or in time. We use the term chunk to refer to a spatial length of the stream (spatial window) and the term session to refer to a length in time (time window).

splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on an infixed separator element, dropping the separator. Splits the stream on separator elements determined by the supplied predicate, separator is considered as infixed between two segments, if one side of the separator is missing then it is parsed as an empty stream. The supplied Fold is applied on the split segments. With - representing non-separator elements and . as separator, splitOn splits as follows:

"--.--" => "--" "--"
"--."   => "--" ""
".--"   => ""   "--"

splitOn (== x) is an inverse of intercalate (S.yield x)

Let's use the following definition for illustration:

splitOn' p xs = S.toList $ S.splitOn p (FL.toList) (S.fromList xs)
>>> splitOn' (== '.') ""
[""]
>>> splitOn' (== '.') "."
["",""]
>>> splitOn' (== '.') ".a"
> ["","a"]
>>> splitOn' (== '.') "a."
> ["a",""]
>>> splitOn' (== '.') "a.b"
> ["a","b"]
>>> splitOn' (== '.') "a..b"
> ["a","","b"]

Since: 0.7.0

splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOn but the separator is considered as suffixed to the segments in the stream. A missing suffix at the end is allowed. A separator at the beginning is parsed as empty segment. With - representing elements and . as separator, splitOnSuffix splits as follows:

 "--.--." => "--" "--"
 "--.--"  => "--" "--"
 ".--."   => "" "--"
splitOnSuffix' p xs = S.toList $ S.splitSuffixBy p (FL.toList) (S.fromList xs)
>>> splitOnSuffix' (== '.') ""
[]
>>> splitOnSuffix' (== '.') "."
[""]
>>> splitOnSuffix' (== '.') "a"
["a"]
>>> splitOnSuffix' (== '.') ".a"
> ["","a"]
>>> splitOnSuffix' (== '.') "a."
> ["a"]
>>> splitOnSuffix' (== '.') "a.b"
> ["a","b"]
>>> splitOnSuffix' (== '.') "a.b."
> ["a","b"]
>>> splitOnSuffix' (== '.') "a..b.."
> ["a","","b",""]
lines = splitOnSuffix (== '\n')

Since: 0.7.0

splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOnSuffix but keeps the suffix attached to the resulting splits.

splitWithSuffix' p xs = S.toList $ S.splitWithSuffix p (FL.toList) (S.fromList xs)
>>> splitWithSuffix' (== '.') ""
[]
>>> splitWithSuffix' (== '.') "."
["."]
>>> splitWithSuffix' (== '.') "a"
["a"]
>>> splitWithSuffix' (== '.') ".a"
> [".","a"]
>>> splitWithSuffix' (== '.') "a."
> ["a."]
>>> splitWithSuffix' (== '.') "a.b"
> ["a.","b"]
>>> splitWithSuffix' (== '.') "a.b."
> ["a.","b."]
>>> splitWithSuffix' (== '.') "a..b.."
> ["a.",".","b.","."]

Since: 0.7.0

wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOn after stripping leading, trailing, and repeated separators. Therefore, ".a..b." with . as the separator would be parsed as ["a","b"]. In other words, its like parsing words from whitespace separated text.

wordsBy' p xs = S.toList $ S.wordsBy p (FL.toList) (S.fromList xs)
>>> wordsBy' (== ',') ""
> []
>>> wordsBy' (== ',') ","
> []
>>> wordsBy' (== ',') ",a,,b,"
> ["a","b"]
words = wordsBy isSpace

Since: 0.7.0

splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOn but the separator is a sequence of elements instead of a single element.

For illustration, let's define a function that operates on pure lists:

splitOnSeq' pat xs = S.toList $ S.splitOnSeq (A.fromList pat) (FL.toList) (S.fromList xs)
>>> splitOnSeq' "" "hello"
> ["h","e","l","l","o"]
>>> splitOnSeq' "hello" ""
> [""]
>>> splitOnSeq' "hello" "hello"
> ["",""]
>>> splitOnSeq' "x" "hello"
> ["hello"]
>>> splitOnSeq' "h" "hello"
> ["","ello"]
>>> splitOnSeq' "o" "hello"
> ["hell",""]
>>> splitOnSeq' "e" "hello"
> ["h","llo"]
>>> splitOnSeq' "l" "hello"
> ["he","","o"]
>>> splitOnSeq' "ll" "hello"
> ["he","o"]

splitOnSeq is an inverse of intercalate. The following law always holds:

intercalate . splitOn == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOn . intercalate == id

Internal

splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitSuffixBy but the separator is a sequence of elements, instead of a predicate for a single element.

splitSuffixOn_ pat xs = S.toList $ S.splitSuffixOn (A.fromList pat) (FL.toList) (S.fromList xs)
>>> splitSuffixOn_ "." ""
[""]
>>> splitSuffixOn_ "." "."
[""]
>>> splitSuffixOn_ "." "a"
["a"]
>>> splitSuffixOn_ "." ".a"
> ["","a"]
>>> splitSuffixOn_ "." "a."
> ["a"]
>>> splitSuffixOn_ "." "a.b"
> ["a","b"]
>>> splitSuffixOn_ "." "a.b."
> ["a","b"]
>>> splitSuffixOn_ "." "a..b.."
> ["a","","b",""]
lines = splitSuffixOn "\n"

Internal

splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOnSeq but splits the separator as well, as an infix token.

splitOn'_ pat xs = S.toList $ S.splitOn' (A.fromList pat) (FL.toList) (S.fromList xs)
>>> splitOn'_ "" "hello"
> ["h","","e","","l","","l","","o"]
>>> splitOn'_ "hello" ""
> [""]
>>> splitOn'_ "hello" "hello"
> ["","hello",""]
>>> splitOn'_ "x" "hello"
> ["hello"]
>>> splitOn'_ "h" "hello"
> ["","h","ello"]
>>> splitOn'_ "o" "hello"
> ["hell","o",""]
>>> splitOn'_ "e" "hello"
> ["h","e","llo"]
>>> splitOn'_ "l" "hello"
> ["he","l","","l","o"]
>>> splitOn'_ "ll" "hello"
> ["he","ll","o"]

Internal

splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitSuffixOn but keeps the suffix intact in the splits.

splitSuffixOn'_ pat xs = S.toList $ FL.splitSuffixOn' (A.fromList pat) (FL.toList) (S.fromList xs)
>>> splitSuffixOn'_ "." ""
[""]
>>> splitSuffixOn'_ "." "."
["."]
>>> splitSuffixOn'_ "." "a"
["a"]
>>> splitSuffixOn'_ "." ".a"
> [".","a"]
>>> splitSuffixOn'_ "." "a."
> ["a."]
>>> splitSuffixOn'_ "." "a.b"
> ["a.","b"]
>>> splitSuffixOn'_ "." "a.b."
> ["a.","b."]
>>> splitSuffixOn'_ "." "a..b.."
> ["a.",".","b.","."]

Internal

splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

splitInnerBy splitter joiner stream splits the inner containers f a of an input stream t m (f a) using the splitter function. Container elements f a are collected until a split occurs, then all the elements before the split are joined using the joiner function.

For example, if we have a stream of Array Word8, we may want to split the stream into arrays representing lines separated by 'n' byte such that the resulting stream after a split would be one array for each line.

CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded.

Internal

splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

Like splitInnerBy but splits assuming the separator joins the segment in a suffix style.

Internal

Grouping

groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #

groups = groupsBy (==)
groups = groupsByRolling (==)

Groups contiguous spans of equal elements together in individual groups.

>>> S.toList $ S.groups FL.toList $ S.fromList [1,1,2,2]
> [[1,1],[2,2]]

Since: 0.7.0

groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

groupsBy cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if a `cmp` b is True then b is also assigned to the same group. If a `cmp` c is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f and the result of the fold is emitted in the output stream.

>>> S.toList $ S.groupsBy (>) FL.toList $ S.fromList [1,3,7,0,2,5]
> [[1,3,7],[0,2,5]]

Since: 0.7.0

groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Unlike groupsBy this function performs a rolling comparison of two successive elements in the input stream. groupsByRolling cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if a `cmp` b is True then b is also assigned to the same group. If b `cmp` c is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f.

>>> S.toList $ S.groupsByRolling (\a b -> a + 1 == b) FL.toList $ S.fromList [1,2,3,7,8,9]
> [[1,2,3],[7,8,9]]

Since: 0.7.0

Group map

rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b Source #

Like rollingMap but with an effectful map function.

Internal

rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #

Apply a function on every two successive elements of a stream. If the stream consists of a single element the output is an empty stream.

Internal

Windowed Classification

Split the stream into windows or chunks in space or time. Each window can be associated with a key, all events associated with a particular key in the window can be folded to a single result. The stream is split into windows of specified size, the window can be terminated early if the closing flag is specified in the input stream.

The term "chunk" is used for a space window and the term "session" is used for a time window.

Tumbling Windows

A new window starts after the previous window is finished.

classifySessionsBy Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> Double

timer tick in seconds

-> Double

session timeout in seconds

-> Bool

reset the timeout when an event is received

-> (Int -> m Bool)

predicate to eject sessions based on session count

-> Fold m a (Either b b)

Fold to be applied to session events

-> t m (k, a, AbsTime)

session key, data, timestamp

-> t m (k, b)

session key, fold result

classifySessionsBy tick timeout idle pred f stream groups timestamped events in an input event stream into sessions based on a session key. Each element in the stream is an event consisting of a triple (session key, sesssion data, timestamp). session key is a key that uniquely identifies the session. All the events belonging to a session are folded using the fold f until the fold returns a Left result or a timeout has occurred. The session key and the result of the fold are emitted in the output stream when the session is purged.

When idle is False, timeout is the maximum lifetime of a session in seconds, measured from the timestamp of the first event in that session. When idle is True then the timeout is an idle timeout, it is reset after every event received in the session.

timestamp in an event characterizes the time when the input event was generated, this is an absolute time measured from some Epoch. The notion of current time is maintained by a monotonic event time clock using the timestamps seen in the input stream. The latest timestamp seen till now is used as the base for the current time. When no new events are seen, a timer is started with a tick duration specified by tick. This timer is used to detect session timeouts in the absence of new events.

The predicate pred is invoked with the current session count, if it returns True a session is ejected from the session cache before inserting a new session. This could be useful to alert or eject sessions when the number of sessions becomes too high.

Internal

classifySessionsOf Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> Double

time window size

-> (Int -> m Bool)

predicate to eject sessions on session count

-> Fold m a (Either b b)

Fold to be applied to session events

-> t m (k, a, AbsTime)

session key, data, timestamp

-> t m (k, b) 

Split the stream into fixed size time windows of specified interval in seconds. Within each such window, fold the elements in sessions identified by the session keys. The fold result is emitted in the output stream if the fold returns a Left result or if the time window ends.

Session timestamp in the input stream is an absolute time from some epoch, characterizing the time when the input element was generated. To detect session window end, a monotonic event time clock is maintained synced with the timestamps with a clock resolution of 1 second.

If the ejection predicate returns True, the session with the longest lifetime is ejected before inserting a new session.

classifySessionsOf interval pred = classifySessionsBy 1 interval False pred

Internal

Keep Alive Windows

The window size is extended if an event arrives within the specified window size. This can represent sessions with idle or inactive timeout.

classifyKeepAliveSessions Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> Double

session inactive timeout

-> (Int -> m Bool)

predicate to eject sessions on session count

-> Fold m a (Either b b)

Fold to be applied to session payload data

-> t m (k, a, AbsTime)

session key, data, timestamp

-> t m (k, b) 

Like classifySessionsOf but the session is kept alive if an event is received within the session window. The session times out and gets closed only if no event is received within the specified session window size.

If the ejection predicate returns True, the session that was idle for the longest time is ejected before inserting a new session.

classifyKeepAliveSessions timeout pred = classifySessionsBy 1 timeout True pred

Internal

Sliding Window Buffers

Combining Streams

Appending

append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.

IMPORTANT NOTE: This could be 100x faster than serial/<> for appending a few (say 100) streams because it can fuse via stream fusion. However, it does not scale for a large number of streams (say 1000s) and becomes qudartically slow. Therefore use this for custom appending of a few streams but use concatMap or 'concatMapWith serial' for appending n streams or infinite containers of streams.

Internal

Interleaving

interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.

>>> :set -XOverloadedStrings
>>> interleave "ab" ",,,," :: SerialT Identity Char
fromList "a,b,,,"
>>> interleave "abcd" ",," :: SerialT Identity Char
fromList "a,b,cd"

interleave is dual to interleaveMin, it can be called interleaveMax.

Do not use at scale in concatMapWith.

Internal

interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.

>>> :set -XOverloadedStrings
>>> interleaveMin "ab" ",,,," :: SerialT Identity Char
fromList "a,b,"
>>> interleaveMin "abcd" ",," :: SerialT Identity Char
fromList "a,b,c"

interleaveMin is dual to interleave.

Do not use at scale in concatMapWith.

Internal

interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

>>> :set -XOverloadedStrings
>>> interleaveSuffix "abc" ",,,," :: SerialT Identity Char
fromList "a,b,c,"
>>> interleaveSuffix "abc" "," :: SerialT Identity Char
fromList "a,bc"

interleaveSuffix is a dual of interleaveInfix.

Do not use at scale in concatMapWith.

Internal

interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

>>> :set -XOverloadedStrings
>>> interleaveInfix "abc" ",,,," :: SerialT Identity Char
fromList "a,b,c"
>>> interleaveInfix "abc" "," :: SerialT Identity Char
fromList "a,bc"

interleaveInfix is a dual of interleaveSuffix.

Do not use at scale in concatMapWith.

Internal

wSerialFst :: IsStream t => t m a -> t m a -> t m a Source #

Like wSerial but stops interleaving as soon as the first stream stops.

Since: 0.7.0

wSerialMin :: IsStream t => t m a -> t m a -> t m a Source #

Like wSerial but stops interleaving as soon as any of the two streams stops.

Since: 0.7.0

Scheduling

roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Schedule the execution of two streams in a fair round-robin manner, executing each stream once, alternately. Execution of a stream may not necessarily result in an output, a stream may chose to Skip producing an element until later giving the other stream a chance to run. Therefore, this combinator fairly interleaves the execution of two streams rather than fairly interleaving the output of the two streams. This can be useful in co-operative multitasking without using explicit threads. This can be used as an alternative to async.

Do not use at scale in concatMapWith.

Internal

Parallel

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as the first stream stops.

Internal

parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as any of the two streams stops.

Internal

Merging

mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

> S.toList $ S.mergeBy compare (S.fromList [1,3,5]) (S.fromList [2,4,6,8])
[1,2,3,4,5,6,8]

Since: 0.6.0

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but with a monadic comparison function.

Merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT
> S.toList $ S.mergeByM randomly (S.fromList [1,1,1,1]) (S.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]

Merge two streams in a proportion of 2:1:

proportionately m n = do
 ref <- newIORef $ cycle $ concat [replicate m LT, replicate n GT]
 return $ \_ _ -> do
     r <- readIORef ref
     writeIORef ref $ tail r
     return $ head r

main = do
 f <- proportionately 2 1
 xs <- S.toList $ S.mergeByM f (S.fromList [1,1,1,1,1,1]) (S.fromList [2,2,2])
 print xs
[1,1,2,1,1,2,1,1,2]

Since: 0.6.0

mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

Zipping

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Zip two streams serially using a pure zipping function.

> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6])
[5,7,9]

Since: 0.1.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWith but using a monadic zipping function.

Since: 0.4.0

zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Like zipWith but zips concurrently i.e. both the streams being zipped are generated concurrently.

Since: 0.1.0

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWithM but zips concurrently i.e. both the streams being zipped are generated concurrently.

Since: 0.4.0

Folding Containers of Streams

foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

foldWith async $ map return [1..3]

Equivalent to:

foldWith f = S.foldMapWith f id

Since: 0.1.0 (Streamly)

foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.

foldMapWith async return [1..3]

Equivalent to:

foldMapWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

Since: 0.1.0 (Streamly)

forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like foldMapWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

forEachWith = flip S.foldMapWith

Since: 0.1.0 (Streamly)

Folding Streams of Streams

concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #

Flatten a stream of streams to a single stream.

concat = concatMap id

Internal

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

Compare with concat and sequence.

Internal

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

concatMap = concatMapWith serial
concatMap f = concatMapM (return . f)

Since: 0.6.0

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

concatMapWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b Source #

concatMapWith merge map stream is a two dimensional looping combinator. The first argument specifies a merge or concat function that is used to merge the streams generated by applying the second argument i.e. the map function to each element of the input stream. The concat function could be serial, parallel, async, ahead or any other zip or merge function and the second argument could be any stream generation function using a seed.

Compare foldMapWith

Since: 0.7.0

Flattening Using Unfolds

concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

Since: 0.7.0

concatUnfoldInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like concatUnfold but interleaves the streams in the same way as interleave behaves instead of appending them.

Internal

concatUnfoldRoundrobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like concatUnfold but executes the streams in the same way as roundrobin.

Internal

Feedback Loops

concatMapIterateWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m a) -> t m a -> t m a Source #

Like iterateM but using a stream generator function.

Internal

concatMapTreeWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b) Source #

Traverse a forest with recursive tree structures whose non-leaf nodes are of type a and leaf nodes are of type b, flattening all the trees into streams and combining the streams into a single stream consisting of both leaf and non-leaf nodes.

concatMapTreeWith is a generalization of concatMap, using a recursive feedback loop to append the non-leaf nodes back to the input stream enabling recursive traversal. concatMap flattens a single level nesting whereas concatMapTreeWith flattens a recursively nested structure.

Traversing a directory tree recursively is a canonical use case of concatMapTreeWith.

concatMapTreeWith combine f xs = concatMapIterateWith combine g xs
     where
     g (Left tree)  = f tree
     g (Right leaf) = nil

Internal

concatMapLoopWith Source #

Arguments

:: (IsStream t, MonadAsync m) 
=> (forall x. t m x -> t m x -> t m x) 
-> (a -> t m (Either b c)) 
-> (b -> t m a)

feedback function to feed b back into input

-> t m a 
-> t m c 

Flatten a stream with a feedback loop back into the input.

For example, exceptions generated by the output stream can be fed back to the input to take any corrective action. The corrective action may be to retry the action or do nothing or log the errors. For the retry case we need a feedback loop.

Internal

concatMapTreeYieldLeavesWith :: (IsStream t, MonadAsync m) => (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either a b)) -> t m a -> t m b Source #

Concat a stream of trees, generating only leaves.

Compare with concatMapTreeWith. While the latter returns all nodes in the tree, this one returns only the leaves.

Traversing a directory tree recursively and yielding on the files is a canonical use case of concatMapTreeYieldLeavesWith.

concatMapTreeYieldLeavesWith combine f = concatMapLoopWith combine f yield

Internal

mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #

Iterate a lazy function f of the shape `m a -> t m a` until it gets fully defined i.e. becomes independent of its argument action, then return the resulting value of the function (`t m a`).

It can be used to construct a stream that uses a cyclic definition. For example:

import Streamly.Internal.Prelude as S
import System.IO.Unsafe (unsafeInterleaveIO)

main = do
    S.mapM_ print $ S.mfix $ x -> do
      a <- S.fromList [1,2]
      b <- S.fromListM [return 3, unsafeInterleaveIO (fmap fst x)]
      return (a, b)

Note that the function f must be lazy in its argument, that's why we use unsafeInterleaveIO because IO monad is strict.

Internal

Inserting Streams in Streams

gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

interleaveInfix followed by unfold and concat.

Internal

gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

interleaveSuffix followed by unfold and concat.

Internal

intercalate :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c Source #

intersperse followed by unfold and concat.

unwords = intercalate " " UF.fromList
>>> intercalate " " UF.fromList ["abc", "def", "ghi"]
> "abc def ghi"

Internal

intercalateSuffix :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c Source #

intersperseSuffix followed by unfold and concat.

unlines = intercalateSuffix "\n" UF.fromList
>>> intercalate "\n" UF.fromList ["abc", "def", "ghi"]
> "abc\ndef\nghi\n"

Internal

interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

unwords = S.interpose ' '

Internal

interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

unlines = S.interposeSuffix '\n'

Internal

Exceptions

before :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Run a side effect before the stream yields its first element.

Since: 0.7.0

after :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Run a side effect whenever the stream stops normally.

Prefer afterIO over this as the after action in this combinator is not executed if the unfold is partially evaluated lazily and then garbage collected.

Since: 0.7.0

afterIO :: (IsStream t, MonadIO m, MonadBaseControl IO m) => m b -> t m a -> t m a Source #

Run a side effect whenever the stream stops normally or is garbage collected after a partial lazy evaluation.

Internal

bracket :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #

Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action using the remembered value as an argument whenever the stream ends normally or due to an exception.

Prefer bracketIO over this as the after action in this combinator is not executed if the unfold is partially evaluated lazily and then garbage collected.

Since: 0.7.0

bracketIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #

Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action using the remembered value as an argument whenever the stream ends normally, due to an exception or if it is garbage collected after a partial lazy evaluation.

Internal

onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #

Run a side effect whenever the stream aborts due to an exception.

Since: 0.7.0

finally :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #

Run a side effect whenever the stream stops normally or aborts due to an exception.

Prefer finallyIO over this as the after action in this combinator is not executed if the unfold is partially evaluated lazily and then garbage collected.

Since: 0.7.0

finallyIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a Source #

Run a side effect whenever the stream stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.

Internal

handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.

Since: 0.7.0

Generalize Inner Monad

hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> SerialT m a -> SerialT n a Source #

Transform the inner monad of a stream using a natural transformation.

Internal

generally :: (IsStream t, Monad m) => t Identity a -> t m a Source #

Generalize the inner monad of the stream from Identity to any monad.

Internal

Transform Inner Monad

liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a Source #

Lift the inner monad of a stream using a monad transformer.

Internal

usingReaderT :: (Monad m, IsStream t) => r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a Source #

Run a stream transformation using a given environment.

Internal

runReaderT :: (IsStream t, Monad m) => s -> t (ReaderT s m) a -> t m a Source #

Evaluate the inner monad of a stream as ReaderT.

Internal

evalStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m a Source #

Evaluate the inner monad of a stream as StateT.

This is supported only for SerialT as concurrent state updation may not be safe.

Internal

usingStateT :: Monad m => s -> (SerialT (StateT s m) a -> SerialT (StateT s m) a) -> SerialT m a -> SerialT m a Source #

Run a stateful (StateT) stream transformation using a given state.

This is supported only for SerialT as concurrent state updation may not be safe.

Internal

runStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m (s, a) Source #

Evaluate the inner monad of a stream as StateT and emit the resulting state and value pair after each step.

This is supported only for SerialT as concurrent state updation may not be safe.

Internal

Diagnostics

inspectMode :: IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

Internal

Deprecated

once :: (Monad m, IsStream t) => m a -> t m a Source #

Deprecated: Please use yieldM instead.

Same as yieldM

Since: 0.2.0

each :: (IsStream t, Foldable f) => f a -> t m a Source #

Deprecated: Please use fromFoldable instead.

Same as fromFoldable.

Since: 0.1.0

scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Deprecated: Please use scanl followed by map instead.

Strict left scan with an extraction function. Like scanl', but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.7.0 (Monad m constraint)

Since 0.2.0

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Deprecated: Please use foldl' followed by fmap instead.

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.2.0

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Deprecated: Please use foldlM' followed by fmap instead.

Like foldx, but with a monadic step function.

Since: 0.2.0

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Deprecated: Use foldrM instead.

Lazy right fold for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

runStream :: Monad m => SerialT m a -> m () Source #

Deprecated: Please use "drain" instead

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example runStream . asyncly.

Since: 0.2.0

runN :: Monad m => Int -> SerialT m a -> m () Source #

Deprecated: Please use "drainN" instead

runN n = runStream . take n

Run maximum up to n iterations of a stream.

Since: 0.6.0

runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

Deprecated: Please use "drainWhile" instead

runWhile p = runStream . takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.6.0

fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #

Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)

Read lines from an IO Handle into a stream of Strings.

Since: 0.1.0

toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #

Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)

toHandle h = S.mapM_ $ hPutStrLn h

Write a stream of Strings to an IO Handle.

Since: 0.1.0