Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Most of the combinators in this module can be implemented as unfolds. Some of them however can only be expressed in terms StreamK e.g. cons/consM, fromFoldable, mfix. We can possibly remove those from this module which can be expressed as unfolds. Unless we want to use rewrite rules to rewrite them as StreamK when StreamK is used, avoiding conversion to StreamD. Will that help? Are there any other reasons to keep these and not use unfolds?
Synopsis
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- (|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
- unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b
- unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
- unfoldrM :: forall t m b a. (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- repeat :: (IsStream t, Monad m) => a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- replicate :: (IsStream t, Monad m) => Int -> a -> t m a
- replicateM :: forall t m a. (IsStream t, MonadAsync m) => Int -> m a -> t m a
- class Enum a => Enumerable a where
- enumerateFrom :: (IsStream t, Monad m) => a -> t m a
- enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a
- enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a
- enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a
- times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64)
- absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime
- absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime
- relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64
- relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64
- durations :: Double -> t m RelTime64
- ticks :: Rate -> t m ()
- timeout :: AbsTime -> t m ()
- fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a
- fromIndicesM :: forall t m a. (IsStream t, MonadAsync m) => (Int -> m a) -> t m a
- iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a
- iterateM :: forall t m a. (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a
- mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a
- fromList :: (Monad m, IsStream t) => [a] -> t m a
- fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
- fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
- fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
- fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
- fromPrimIORef :: (IsStream t, MonadIO m, Prim a) => IORef a -> t m a
- once :: (Monad m, IsStream t) => m a -> t m a
- yield :: IsStream t => a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a
- each :: (IsStream t, Foldable f) => f a -> t m a
- fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String
- currentTime :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime
Primitives
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as (return a) `consM` r
but
more efficient. For concurrent streams this is not concurrent whereas
consM
is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM
. We can read it as "parallel colon
"
to remember that |
comes before :
.
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
From Unfold
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #
Convert an Unfold
into a stream by supplying it an input seed.
>>>
Stream.drain $ Stream.unfold (Unfold.replicateM 3) (putStrLn "hello")
hello hello hello
Since: 0.7.0
unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b Source #
Convert an Unfold
with a closed input end into a stream.
Pre-release
Unfolding
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #
>>>
:{
unfoldr step s = case step s of Nothing -> Stream.nil Just (a, b) -> a `Stream.cons` unfoldr step b :}
Build a stream by unfolding a pure step function step
starting from a
seed s
. The step function returns the next element in the stream and the
next seed value. When it is done it returns Nothing
and the stream ends.
For example,
>>>
:{
let f b = if b > 2 then Nothing else Just (b, b + 1) in Stream.toList $ Stream.unfoldr f 0 :} [0,1,2]
Since: 0.1.0
unfoldrM :: forall t m b a. (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #
Build a stream by unfolding a monadic step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns Nothing
and the stream ends. For
example,
>>>
:{
let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in Stream.toList $ Stream.unfoldrM f 0 :} [0,1,2]
When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.
>>>
:{
let f b = if b > 2 then return Nothing else threadDelay 1000000 >> return (Just (b, b + 1)) in Stream.toList $ Stream.delay 1 $ Stream.fromAsync $ Stream.unfoldrM f 0 :} [0,1,2]
Concurrent
Since: 0.1.0
From Values
fromPure :: IsStream t => a -> t m a Source #
fromPure a = a `cons` nil
Create a singleton stream from a pure value.
The following holds in monadic streams, but not in Zip streams:
fromPure = pure fromPure = fromEffect . pure
In Zip applicative streams fromPure
is not the same as pure
because in that
case pure
is equivalent to repeat
instead. fromPure
and pure
are
equally efficient, in other cases fromPure
may be slightly more efficient
than the other equivalent definitions.
Since: 0.8.0 (Renamed yield to fromPure)
fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #
fromEffect m = m `consM` nil
Create a singleton stream from a monadic action.
> Stream.toList $ Stream.fromEffect getLine hello ["hello"]
Since: 0.8.0 (Renamed yieldM to fromEffect)
repeat :: (IsStream t, Monad m) => a -> t m a Source #
Generate an infinite stream by repeating a pure value.
Since: 0.4.0
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #
>>>
repeatM = fix . consM
>>>
repeatM = cycle1 . fromEffect
Generate a stream by repeatedly executing a monadic action forever.
>>>
:{
repeatAsync = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fromAsync & Stream.drain :}
Concurrent, infinite (do not use with fromParallel
)
Since: 0.2.0
replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #
>>>
replicate n = Stream.take n . Stream.repeat
Generate a stream of length n
by repeating a value n
times.
Since: 0.6.0
replicateM :: forall t m a. (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #
>>>
replicateM n = Stream.take n . Stream.repeatM
Generate a stream by performing a monadic action n
times. Same as:
>>>
pr n = threadDelay 1000000 >> print n
This runs serially and takes 3 seconds:
>>>
Stream.drain $ Stream.fromSerial $ Stream.replicateM 3 $ pr 1
1 1 1
This runs concurrently and takes just 1 second:
>>>
Stream.drain $ Stream.fromAsync $ Stream.replicateM 3 $ pr 1
1 1 1
Concurrent
Since: 0.1.1
Enumeration
class Enum a => Enumerable a where Source #
Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the Enum
type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.
Since: 0.6.0
enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #
enumerateFrom from
generates a stream starting with the element
from
, enumerating up to maxBound
when the type is Bounded
or
generating an infinite stream when the type is not Bounded
.
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int) [0,1,2,3]
For Fractional
types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1 [1.1,2.1,3.1,4.1]
Since: 0.6.0
enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #
Generate a finite stream starting with the element from
, enumerating
the type up to the value to
. If to
is smaller than from
then an
empty stream is returned.
>>> Stream.toList $ Stream.enumerateFromTo 0 4 [0,1,2,3,4]
For Fractional
types, the last element is equal to the specified to
value after rounding to the nearest integral value.
>>> Stream.toList $ Stream.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1]
Since: 0.6.0
enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #
enumerateFromThen from then
generates a stream whose first element
is from
, the second element is then
and the successive elements are
in increments of then - from
. Enumeration can occur downwards or
upwards depending on whether then
comes before or after from
. For
Bounded
types the stream ends when maxBound
is reached, for
unbounded types it keeps enumerating infinitely.
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2) [0,-2,-4,-6]
Since: 0.6.0
enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #
enumerateFromThenTo from then to
generates a finite stream whose
first element is from
, the second element is then
and the successive
elements are in increments of then - from
up to to
. Enumeration can
occur downwards or upwards depending on whether then
comes before or
after from
.
>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6 [0,2,4,6] >>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6]
Since: 0.6.0
Instances
enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #
Time Enumeration
times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64) Source #
times
returns a stream of time value tuples with clock of 10 ms
granularity. The first component of the tuple is an absolute time reference
(epoch) denoting the start of the stream and the second component is a time
relative to the reference.
>>>
Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
Note: This API is not safe on 32-bit machines.
Pre-release
absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime Source #
absTimes
returns a stream of absolute timestamps using a clock of 10 ms
granularity.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})
Note: This API is not safe on 32-bit machines.
Pre-release
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #
absTimesWith g
returns a stream of absolute timestamps using a clock of
granularity g
specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})
Note: This API is not safe on 32-bit machines.
Pre-release
relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64 Source #
relTimes
returns a stream of relative time values starting from 0,
using a clock of granularity 10 ms.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)
Note: This API is not safe on 32-bit machines.
Pre-release
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #
relTimesWith g
returns a stream of relative time values starting from 0,
using a clock of granularity g
specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)
Note: This API is not safe on 32-bit machines.
Pre-release
durations :: Double -> t m RelTime64 Source #
durations g
returns a stream of relative time values measuring the time
elapsed since the immediate predecessor element of the stream was generated.
The first element of the stream is always 0. durations
uses a clock of
granularity g
specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
Durations lower than 1 ms will be 0.
Note: This API is not safe on 32-bit machines.
Unimplemented
ticks :: Rate -> t m () Source #
Generate ticks at the specified rate. The rate is adaptive, the tick
generation speed can be increased or decreased at different times to achieve
the specified rate. The specific behavior for different styles of Rate
specifications is documented under Rate
. The effective maximum rate
achieved by a stream is governed by the processor speed.
Unimplemented
timeout :: AbsTime -> t m () Source #
Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.
Unimplemented
From Generators
fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #
>>>
fromIndices f = fmap f $ Stream.enumerateFrom 0
>>>
fromIndices f = let g i = f i `Stream.cons` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a function f
applied on the corresponding index. Index starts at 0.
>>>
Stream.toList $ Stream.take 5 $ Stream.fromIndices id
[0,1,2,3,4]
Since: 0.6.0
fromIndicesM :: forall t m a. (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #
>>>
fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
>>>
fromIndicesM f = let g i = f i `Stream.consM` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a monadic
function f
applied on the corresponding index. Index starts at 0.
Concurrent
Since: 0.6.0
Iteration
iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #
>>>
iterate f x = x `Stream.cons` iterate f x
Generate an infinite stream with x
as the first element and each
successive element derived by applying the function f
on the previous
element.
>>>
Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]
Since: 0.1.2
iterateM :: forall t m a. (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #
>>>
iterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)
Generate an infinite stream with the first element generated by the action
m
and each successive element derived by applying the monadic function
f
on the previous element.
>>>
pr n = threadDelay 1000000 >> print n
>>>
:{
Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.take 3 & Stream.fromSerial & Stream.toList :} 0 1 [0,1,2]
When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.
>>>
:{
Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.delay 1 & Stream.take 3 & Stream.fromAsync & Stream.toList :} 0 1 ...
Concurrent
Since: 0.1.2
Since: 0.7.0 (signature change)
Cyclic Elements
mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #
We can define cyclic structures using let
:
>>>
let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)
The function fix
defined as:
>>>
fix f = let x = f x in x
ensures that the argument of a function and its output refer to the same
lazy value x
i.e. the same location in memory. Thus x
can be defined
in terms of itself, creating structures with cyclic references.
>>>
f ~(a, b) = ([1, b], head a)
>>>
fix f
([1,1],1)
mfix
is essentially the same as fix
but for monadic
values.
Using mfix
for streams we can construct a stream in which each element of
the stream is defined in a cyclic fashion. The argument of the function
being fixed represents the current element of the stream which is being
returned by the stream monad. Thus, we can use the argument to construct
itself.
In the following example, the argument action
of the function f
represents the tuple (x,y)
returned by it in a given iteration. We define
the first element of the tuple in terms of the second.
>>>
import Streamly.Internal.Data.Stream.IsStream as Stream
>>>
import System.IO.Unsafe (unsafeInterleaveIO)
>>>
:{
main = Stream.mapM_ print $ Stream.mfix f where f action = do let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act x <- Stream.fromListM [incr 1 action, incr 2 action] y <- Stream.fromList [4,5] return (x, y) :}
Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.
Note that the function f
must be lazy in its argument, that's why we use
unsafeInterleaveIO
on action
because IO monad is strict.
Pre-release
From Containers
fromList :: (Monad m, IsStream t) => [a] -> t m a Source #
fromList =foldr
cons
nil
Construct a stream from a list of pure values. This is more efficient than
fromFoldable
for serial streams.
Since: 0.4.0
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #
>>>
fromListM = Stream.fromFoldableM
>>>
fromListM = Stream.sequence . Stream.fromList
>>>
fromListM = Stream.mapM id . Stream.fromList
>>>
fromListM = Prelude.foldr Stream.consM Stream.nil
Construct a stream from a list of monadic actions. This is more efficient
than fromFoldableM
for serial streams.
Since: 0.4.0
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #
>>>
fromFoldable = Prelude.foldr Stream.cons Stream.nil
Construct a stream from a Foldable
containing pure values:
Since: 0.2.0
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #
>>>
fromFoldableM = Prelude.foldr Stream.consM Stream.nil
Construct a stream from a Foldable
containing monadic actions.
>>>
pr n = threadDelay 1000000 >> print n
>>>
Stream.drain $ Stream.fromSerial $ Stream.fromFoldableM $ map pr [1,2,3]
1 2 3
>>>
Stream.drain $ Stream.fromAsync $ Stream.fromFoldableM $ map pr [1,2,3]
... ... ...
Concurrent (do not use with fromParallel
on infinite containers)
Since: 0.3.0
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #
Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.
Pre-release
fromPrimIORef :: (IsStream t, MonadIO m, Prim a) => IORef a -> t m a Source #
Construct a stream by reading a Prim
IORef
repeatedly.
Pre-release
Deprecated
once :: (Monad m, IsStream t) => m a -> t m a Source #
Deprecated: Please use fromEffect instead.
Same as fromEffect
Since: 0.2.0
fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #
Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)
Read lines from an IO Handle into a stream of Strings.
Since: 0.1.0
currentTime :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #
Deprecated: Please use absTimes instead