Copyright | (c) 2018 Composewell Technologies (c) Roman Leshchinskiy 2008-2010 |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Internal.Data.Pipe might ultimately replace this module.
Synopsis
- transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b
- map :: Monad m => (a -> b) -> Stream m a -> Stream m b
- mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
- sequence :: Monad m => Stream m (m a) -> Stream m a
- tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
- tapOffsetEvery :: Monad m => Int -> Int -> Fold m a b -> Stream m a -> Stream m a
- trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a
- trace_ :: Monad m => m b -> Stream m a -> Stream m a
- foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
- foldlS :: Monad m => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
- postscan :: Monad m => Fold m a b -> Stream m a -> Stream m b
- scan :: Monad m => Fold m a b -> Stream m a -> Stream m b
- scanMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
- splitOn :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- scanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
- scanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
- scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
- scanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
- scanl :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
- scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
- scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
- scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
- scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
- prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
- prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
- postscanl :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
- postscanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
- postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
- postscanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
- postscanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
- postscanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
- postscanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
- scanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
- scanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
- with :: Monad m => (Stream m a -> Stream m (s, a)) -> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a)) -> ((s, a) -> b) -> Stream m a -> Stream m a
- scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
- filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
- uniqBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a
- uniq :: (Eq a, Monad m) => Stream m a -> Stream m a
- prune :: (a -> Bool) -> Stream m a -> Stream m a
- repeated :: Stream m a -> Stream m a
- take :: Applicative m => Int -> Stream m a -> Stream m a
- takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- takeWhileLast :: (a -> Bool) -> Stream m a -> Stream m a
- takeWhileAround :: (a -> Bool) -> Stream m a -> Stream m a
- drop :: Monad m => Int -> Stream m a -> Stream m a
- dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- dropLast :: Int -> Stream m a -> Stream m a
- dropWhileLast :: (a -> Bool) -> Stream m a -> Stream m a
- dropWhileAround :: (a -> Bool) -> Stream m a -> Stream m a
- insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
- intersperse :: Monad m => a -> Stream m a -> Stream m a
- intersperseM :: Monad m => m a -> Stream m a -> Stream m a
- intersperseMWith :: Int -> m a -> Stream m a -> Stream m a
- intersperseMSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a
- intersperseMSuffixWith :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a
- intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a
- intersperseMSuffix_ :: Monad m => m b -> Stream m a -> Stream m a
- intersperseMPrefix_ :: Monad m => m b -> Stream m a -> Stream m a
- delay :: MonadIO m => Double -> Stream m a -> Stream m a
- delayPre :: MonadIO m => Double -> Stream m a -> Stream m a
- delayPost :: MonadIO m => Double -> Stream m a -> Stream m a
- reverse :: Monad m => Stream m a -> Stream m a
- reverseUnbox :: (MonadIO m, Unbox a) => Stream m a -> Stream m a
- reassembleBy :: Fold m a b -> (a -> a -> Int) -> Stream m a -> Stream m b
- indexed :: Monad m => Stream m a -> Stream m (Int, a)
- indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
- timestampWith :: MonadIO m => Double -> Stream m a -> Stream m (AbsTime, a)
- timestamped :: MonadIO m => Stream m a -> Stream m (AbsTime, a)
- timeIndexWith :: MonadIO m => Double -> Stream m a -> Stream m (RelTime64, a)
- timeIndexed :: MonadIO m => Stream m a -> Stream m (RelTime64, a)
- findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
- elemIndices :: (Monad m, Eq a) => a -> Stream m a -> Stream m Int
- slicesBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int)
- rollingMap :: Monad m => (Maybe a -> a -> b) -> Stream m a -> Stream m b
- rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Stream m a -> Stream m b
- rollingMap2 :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
- mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
- mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
- catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a
- catLefts :: Monad m => Stream m (Either a b) -> Stream m a
- catRights :: Monad m => Stream m (Either a b) -> Stream m b
- catEithers :: Monad m => Stream m (Either a a) -> Stream m a
Piping
Pass through a Pipe
.
transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b Source #
Use a Pipe
to transform a stream.
Pre-release
Mapping
Stateless one-to-one maps.
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #
>>>
mapM f = Stream.sequence . fmap f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
>>>
s = Stream.fromList ["a", "b", "c"]
>>>
Stream.fold Fold.drain $ Stream.mapM putStr s
abc
sequence :: Monad m => Stream m (m a) -> Stream m a Source #
>>>
sequence = Stream.mapM id
Replace the elements of a stream of monadic actions with the outputs of those actions.
>>>
s = Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
>>>
Stream.fold Fold.drain $ Stream.sequence s
abc
Mapping Effects
tap :: Monad m => Fold m a b -> Stream m a -> Stream m a Source #
Tap the data flowing through a stream into a Fold
. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.
Fold m a b | -----stream m a ---------------stream m a-----
>>>
s = Stream.enumerateFromTo 1 2
>>>
Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s
1 2
Compare with trace
.
trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a Source #
Apply a monadic function to each element flowing through the stream and discard the results.
>>>
s = Stream.enumerateFromTo 1 2
>>>
Stream.fold Fold.drain $ Stream.trace print s
1 2
Compare with tap
.
trace_ :: Monad m => m b -> Stream m a -> Stream m a Source #
Perform a side effect before yielding each element of the stream and discard the results.
>>>
s = Stream.enumerateFromTo 1 2
>>>
Stream.fold Fold.drain $ Stream.trace_ (print "got here") s
"got here" "got here"
Same as intersperseMPrefix_
but always serial.
See also: trace
Pre-release
Folding
foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #
foldlS :: Monad m => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #
Scanning By Fold
postscan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Postscan a stream using the given monadic fold.
The following example extracts the input stream up to a point where the running average of elements is no more than 10:
>>>
import Data.Maybe (fromJust)
>>>
let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>>
s = Stream.enumerateFromTo 1.0 100.0
>>>
:{
Stream.fold Fold.toList $ fmap (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.latest avg) s :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
scan :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Strict left scan. Scan a stream using the given monadic fold.
>>>
s = Stream.fromList [1..10]
>>>
Stream.fold Fold.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum s
[0,1,3,6]
See also: usingStateT
scanMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Like scan
but restarts scanning afresh when the scanning fold
terminates.
Splitting
splitOn :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #
Split on an infixed separator element, dropping the separator. The
supplied Fold
is applied on the split segments. Splits the stream on
separator elements determined by the supplied predicate, separator is
considered as infixed between two segments:
>>>
splitOn' p xs = Stream.fold Fold.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
>>>
splitOn' (== '.') "a.b"
["a","b"]
An empty stream is folded to the default value of the fold:
>>>
splitOn' (== '.') ""
[""]
If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:
>>>
splitOn' (== '.') "."
["",""]
>>>
splitOn' (== '.') ".a"
["","a"]
>>>
splitOn' (== '.') "a."
["a",""]
>>>
splitOn' (== '.') "a..b"
["a","","b"]
splitOn is an inverse of intercalating single element:
Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id
Assuming the input stream does not contain the separator:
Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id
Scanning
Left scans. Stateful, mostly one-to-one maps.
scanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b Source #
Like scanl'
but with a monadic step function and a monadic seed.
scanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b Source #
scanlMAfter' accumulate initial done stream
is like scanlM'
except
that it provides an additional done
function to be applied on the
accumulator when the stream stops. The result of done
is also emitted in
the stream.
This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.
Pre-release
scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b Source #
Strict left scan. Like map
, scanl'
too is a one to one transformation,
however it adds an extra element.
>>>
Stream.toList $ Stream.scanl' (+) 0 $ Stream.fromList [1,2,3,4]
[0,1,3,6,10]
>>>
Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
[[],[1],[2,1],[3,2,1],[4,3,2,1]]
The output of scanl'
is the initial value of the accumulator followed by
all the intermediate steps and the final result of foldl'
.
By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.
Consider the following monolithic example, computing the sum and the product
of the elements in a stream in one go using a foldl'
:
>>>
Stream.fold (Fold.foldl' (\(s, p) x -> (s + x, p * x)) (0,1)) $ Stream.fromList [1,2,3,4]
(10,24)
Using scanl'
we can make it modular by computing the sum in the first
stage and passing it down to the next stage for computing the product:
>>>
:{
Stream.fold (Fold.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1)) $ Stream.scanl' (\(s, _) x -> (s + x, x)) (0,1) $ Stream.fromList [1,2,3,4] :} (10,24)
IMPORTANT: scanl'
evaluates the accumulator to WHNF. To avoid building
lazy expressions inside the accumulator, it is recommended that a strict
data structure is used for accumulator.
>>>
scanl' step z = Stream.scan (Fold.foldl' step z)
>>>
scanl' f z xs = Stream.scanlM' (\a b -> return (f a b)) (return z) xs
See also: usingStateT
scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a Source #
Like scanl1'
but with a monadic step function.
scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a Source #
Like scanl'
but for a non-empty stream. The first element of the stream
is used as the initial value of the accumulator. Does nothing if the stream
is empty.
>>>
Stream.toList $ Stream.scanl1' (+) $ Stream.fromList [1,2,3,4]
[1,3,6,10]
postscanlMAfter' :: Monad m => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b Source #
postscanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b Source #
Filtering
Produce a subset of the stream.
with :: Monad m => (Stream m a -> Stream m (s, a)) -> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a)) -> ((s, a) -> b) -> Stream m a -> Stream m a Source #
Modify a Stream m a -> Stream m a
stream transformation that accepts a
predicate (a -> b)
to accept ((s, a) -> b)
instead, provided a
transformation Stream m a -> Stream m (s, a)
. Convenient to filter with
index or time.
>>>
filterWithIndex = Stream.with Stream.indexed Stream.filter
Pre-release
scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b Source #
Use a filtering fold on a stream.
>>>
scanMaybe f = Stream.catMaybes . Stream.postscan f
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
Include only those elements that pass a predicate.
>>>
filter p = Stream.filterM (return . p)
>>>
filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)
>>>
filter p = Stream.scanMaybe (Fold.filtering p)
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as filter
but with a monadic predicate.
>>>
f p x = p x >>= \r -> return $ if r then Just x else Nothing
>>>
filterM p = Stream.mapMaybeM (f p)
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a Source #
Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.
>>>
input = Stream.fromList [1,3,3,5]
>>>
Stream.fold Fold.toList $ Stream.deleteBy (==) 3 input
[1,3,5]
uniqBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Stream m a Source #
Drop repeated elements that are adjacent to each other using the supplied comparison function.
>>>
uniq = Stream.uniqBy (==)
To strip duplicate path separators:
>>>
input = Stream.fromList "//a//b"
>>>
f x y = x == '/' && y == '/'
>>>
Stream.fold Fold.toList $ Stream.uniqBy f input
"/a/b"
Space: O(1)
Pre-release
uniq :: (Eq a, Monad m) => Stream m a -> Stream m a Source #
Drop repeated elements that are adjacent to each other.
>>>
uniq = Stream.uniqBy (==)
prune :: (a -> Bool) -> Stream m a -> Stream m a Source #
Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.
> prune p = Stream.dropWhileAround p $ Stream.uniqBy (x y -> p x && p y)
> Stream.prune isSpace (Stream.fromList " hello world! ") "hello world!"
Space: O(1)
Unimplemented
Trimming
Produce a subset of the stream trimmed at ends.
take :: Applicative m => Int -> Stream m a -> Stream m a Source #
Take first n
elements from the stream and discard the rest.
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
End the stream as soon as the predicate fails on an element.
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as takeWhile
but with a monadic predicate.
takeWhileLast :: (a -> Bool) -> Stream m a -> Stream m a Source #
Take all consecutive elements at the end of the stream for which the predicate is true.
O(n) space, where n is the number elements taken.
Unimplemented
takeWhileAround :: (a -> Bool) -> Stream m a -> Stream m a Source #
Like takeWhile
and takeWhileLast
combined.
O(n) space, where n is the number elements taken from the end.
Unimplemented
drop :: Monad m => Int -> Stream m a -> Stream m a Source #
Discard first n
elements from the stream and take the rest.
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as dropWhile
but with a monadic predicate.
dropLast :: Int -> Stream m a -> Stream m a Source #
Drop n
elements at the end of the stream.
O(n) space, where n is the number elements dropped.
Unimplemented
dropWhileLast :: (a -> Bool) -> Stream m a -> Stream m a Source #
Drop all consecutive elements at the end of the stream for which the predicate is true.
O(n) space, where n is the number elements dropped.
Unimplemented
dropWhileAround :: (a -> Bool) -> Stream m a -> Stream m a Source #
Like dropWhile
and dropWhileLast
combined.
O(n) space, where n is the number elements dropped from the end.
Unimplemented
Inserting Elements
Produce a superset of the stream.
insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #
insertBy cmp elem stream
inserts elem
before the first element in
stream
that is less than elem
when compared using cmp
.
>>>
insertBy cmp x = Stream.mergeBy cmp (Stream.fromPure x)
>>>
input = Stream.fromList [1,3,5]
>>>
Stream.fold Fold.toList $ Stream.insertBy compare 2 input
[1,2,3,5]
intersperse :: Monad m => a -> Stream m a -> Stream m a Source #
Insert a pure value between successive elements of a stream.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.intersperse ',' input
"h,e,l,l,o"
intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #
Insert an effect and its output before consuming an element of a stream except the first one.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') input
h.,e.,l.,l.,o"h,e,l,l,o"
Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".
>>>
Stream.fold Fold.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar input
he.l.l.o."h,e,l,l,o"
intersperseMWith :: Int -> m a -> Stream m a -> Stream m a Source #
Intersperse a monadic action into the input stream after every n
elements.
> input = Stream.fromList "hello" > Stream.fold Fold.toList $ Stream.intersperseMWith 2 (return ',') input
"he,ll,o"
Unimplemented
intersperseMSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a Source #
Insert an effect and its output after consuming an element of a stream.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseMSuffix (putChar '.' >> return ',') input
h.,e.,l.,l.,o.,"h,e,l,l,o,"
Pre-release
intersperseMSuffixWith :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a Source #
Like intersperseMSuffix
but intersperses an effectful action into the
input stream after every n
elements and after the last element.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.intersperseMSuffixWith 2 (return ',') input
"he,ll,o,"
Pre-release
Inserting Side Effects
intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a Source #
Insert a side effect before consuming an element of a stream except the first one.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') input
h.e.l.l.o
Pre-release
intersperseMSuffix_ :: Monad m => m b -> Stream m a -> Stream m a Source #
Insert a side effect after consuming an element of a stream.
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.intersperseMSuffix_ (threadDelay 1000000) input
"hello"
Pre-release
intersperseMPrefix_ :: Monad m => m b -> Stream m a -> Stream m a Source #
Insert a side effect before consuming an element of a stream.
Definition:
>>>
intersperseMPrefix_ m = Stream.mapM (\x -> void m >> return x)
>>>
input = Stream.fromList "hello"
>>>
Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseMPrefix_ (putChar '.' >> return ',') input
.h.e.l.l.o"hello"
Same as trace_
.
Pre-release
delay :: MonadIO m => Double -> Stream m a -> Stream m a Source #
Introduce a delay of specified seconds between elements of the stream.
Definition:
>>>
sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>>
delay = Stream.intersperseM_ . sleep
Example:
>>>
input = Stream.enumerateFromTo 1 3
>>>
Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input
1 2 3
delayPre :: MonadIO m => Double -> Stream m a -> Stream m a Source #
Introduce a delay of specified seconds before consuming an element of a stream.
Definition:
>>>
sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>>
delayPre = Stream.intersperseMPrefix_. sleep
Example:
>>>
input = Stream.enumerateFromTo 1 3
>>>
Stream.fold (Fold.drainMapM print) $ Stream.delayPre 1 input
1 2 3
Pre-release
delayPost :: MonadIO m => Double -> Stream m a -> Stream m a Source #
Introduce a delay of specified seconds after consuming an element of a stream.
Definition:
>>>
sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>>
delayPost = Stream.intersperseMSuffix_ . sleep
Example:
>>>
input = Stream.enumerateFromTo 1 3
>>>
Stream.fold (Fold.drainMapM print) $ Stream.delayPost 1 input
1 2 3
Pre-release
Reordering
Produce strictly the same set but reordered.
reverse :: Monad m => Stream m a -> Stream m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
Definition:
>>>
reverse m = Stream.concatEffect $ Stream.fold Fold.toListRev m >>= return . Stream.fromList
reassembleBy :: Fold m a b -> (a -> a -> Int) -> Stream m a -> Stream m b Source #
Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.
Unimplemented
Position Indexing
indexed :: Monad m => Stream m a -> Stream m (Int, a) Source #
>>>
f = Fold.foldl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
>>>
indexed = Stream.postscan f
>>>
indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
>>>
indexedR n = fmap (\(i, a) -> (n - i, a)) . indexed
Pair each element in a stream with its index, starting from index 0.
>>>
Stream.fold Fold.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]
indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a) Source #
>>>
f n = Fold.foldl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
>>>
indexedR n = Stream.postscan (f n)
>>>
s n = Stream.enumerateFromThen n (n - 1)
>>>
indexedR n = Stream.zipWith (,) (s n)
Pair each element in a stream with its index, starting from the
given index n
and counting down.
>>>
Stream.fold Fold.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]
Time Indexing
timestampWith :: MonadIO m => Double -> Stream m a -> Stream m (AbsTime, a) Source #
Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.
>>>
Stream.fold Fold.toList $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
[(AbsTime (TimeSpec {sec = ..., nsec = ...}),1),(AbsTime (TimeSpec {sec = ..., nsec = ...}),2),(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)]
Pre-release
timeIndexWith :: MonadIO m => Double -> Stream m a -> Stream m (RelTime64, a) Source #
Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.
>>>
Stream.fold Fold.toList $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
[(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]
Pre-release
timeIndexed :: MonadIO m => Stream m a -> Stream m (RelTime64, a) Source #
Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.
>>>
Stream.fold Fold.toList $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
[(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]
Pre-release
Searching
findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int Source #
Find all the indices where the element in the stream satisfies the given predicate.
>>>
findIndices p = Stream.scanMaybe (Fold.findIndices p)
elemIndices :: (Monad m, Eq a) => a -> Stream m a -> Stream m Int Source #
Find all the indices where the value of the element in the stream is equal to the given value.
>>>
elemIndices a = Stream.findIndices (== a)
Rolling map
Map using the previous element.
rollingMap :: Monad m => (Maybe a -> a -> b) -> Stream m a -> Stream m b Source #
Apply a function on every two successive elements of a stream. The first
argument of the map function is the previous element and the second argument
is the current element. When the current element is the first element, the
previous element is Nothing
.
Pre-release
rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Stream m a -> Stream m b Source #
Like rollingMap
but with an effectful map function.
Pre-release
rollingMap2 :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b Source #
Like rollingMap
but requires at least two elements in the stream,
returns an empty stream otherwise.
This is the stream equivalent of the list idiom zipWith f xs (tail xs)
.
Pre-release
Maybe Streams
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b Source #
Like mapMaybe
but maps a monadic function.
Equivalent to:
>>>
mapMaybeM f = Stream.catMaybes . Stream.mapM f
>>>
mapM f = Stream.mapMaybeM (\x -> Just <$> f x)