Copyright | (c) 2020 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Top level IsStream module that can use all other lower level IsStream modules.
Synopsis
- sampleFromThen :: (IsStream t, Monad m, Functor (t m)) => Int -> Int -> t m a -> t m a
- sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a
- sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a
- sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a
- sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a
- sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a
- intersectBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a
- intersectBySorted :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- differenceBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a
- mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
- unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) => (a -> a -> Bool) -> t m a -> t m a -> t m a
- mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
- crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
- joinInner :: forall (t :: (Type -> Type) -> Type -> Type) m a b. (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
- joinInnerMap :: (IsStream t, Monad m, Ord k) => t m (k, a) -> t m (k, b) -> t m (k, a, b)
- joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
- joinLeft :: Monad m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
- mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
- joinLeftMap :: (IsStream t, Ord k, Monad m) => t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
- joinOuter :: MonadIO m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b)
- mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
- joinOuterMap :: (IsStream t, Ord k, MonadIO m) => t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
Transformation
Sampling
Value agnostic filtering.
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) => Int -> Int -> t m a -> t m a Source #
sampleFromthen offset stride
samples the element at offset
index and
then every element at strides of stride
.
>>>
Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
[2,5,8]
Pre-release
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #
Like sampleInterval
but samples at the beginning of the time window.
sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.head
Pre-release
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #
Continuously evaluate the input stream and sample the last event in time
window of n
seconds.
This is also known as throttle
in some libraries.
sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last
Pre-release
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #
Like sampleBurstEnd
but samples the event at the beginning of the burst
instead of at the end of it.
Pre-release
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #
Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval from the previous event.
This is known as debounce
in some libraries.
The clock granularity is 10 ms.
Pre-release
Reordering
sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a Source #
Sort the input stream using a supplied comparison function.
O(n) space
Note: this is not the fastest possible implementation as of now.
Pre-release
Nesting
Set like operations
These are not exactly set operations because streams are not necessarily sets, they may have duplicated elements.
intersectBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #
intersectBy
is essentially a filtering operation that retains only those
elements in the first stream that are present in the second stream.
>>>
Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,2,2]
>>>
Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
[2,1,1]
intersectBy
is similar to but not the same as joinInner
:
>>>
Stream.toList $ fmap fst $ Stream.joinInner (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,1,2,2]
Space: O(n) where n
is the number of elements in the second stream.
Time: O(m x n) where m
is the number of elements in the first stream and
n
is the number of elements in the second stream.
Pre-release
intersectBySorted :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Like intersectBy
but works only on streams sorted in ascending order.
Space: O(1)
Time: O(m+n)
Pre-release
differenceBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #
Delete first occurrences of those elements from the first stream that are present in the second stream. If an element occurs multiple times in the second stream as many occurrences of it are deleted from the first stream.
>>>
Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])
[2]
The following laws hold:
(s1serial
s2) `differenceBy eq` s1 === s2 (s1wSerial
s2) `differenceBy eq` s1 === s2
Same as the list //
operation.
Space: O(m) where m
is the number of elements in the first stream.
Time: O(m x n) where m
is the number of elements in the first stream and
n
is the number of elements in the second stream.
Pre-release
mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #
This is essentially an append operation that appends all the extra occurrences of elements from the second stream that are not already present in the first stream.
>>>
Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])
[1,2,2,4,3]
Equivalent to the following except that s1
is evaluated only once:
unionBy eq s1 s2 = s1 `serial` (s2 `differenceBy eq` s1)
Similar to joinOuter
but not the same.
Space: O(n)
Time: O(m x n)
Pre-release
mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Join operations
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b) Source #
This is the same as outerProduct
but less
efficient.
The second stream is evaluated multiple times. If the second stream is
consume-once stream then it can be cached in an Array
before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.
Time: O(m x n)
Pre-release
joinInner :: forall (t :: (Type -> Type) -> Type -> Type) m a b. (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> t m (a, b) Source #
For all elements in t m a
, for all elements in t m b
if a
and b
are equal by the given equality pedicate then return the tuple (a, b).
The second stream is evaluated multiple times. If the stream is a
consume-once stream then the caller should cache it (e.g. in a
Array
) before calling this function. Caching may also improve
performance if the stream is expensive to evaluate.
For space efficiency use the smaller stream as the second stream.
You should almost always use joinInnerMap instead of joinInner. joinInnerMap is an order of magnitude faster. joinInner may be used when the second stream is generated from a seed, therefore, need not be stored in memory and the amount of memory it takes is a concern.
Space: O(n) assuming the second stream is cached in memory.
Time: O(m x n)
Pre-release
joinInnerMap :: (IsStream t, Monad m, Ord k) => t m (k, a) -> t m (k, b) -> t m (k, a, b) Source #
Like joinInner
but uses a Map
for efficiency.
If the input streams have duplicate keys, the behavior is undefined.
For space efficiency use the smaller stream as the second stream.
Space: O(n)
Time: O(m + n)
Pre-release
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b) Source #
joinLeft :: Monad m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b) Source #
For all elements in t m a
, for all elements in t m b
if a
and b
are equal then return the tuple (a, Just b)
. If a
is not present in t
m b
then return (a, Nothing)
.
The second stream is evaluated multiple times. If the stream is a
consume-once stream then the caller should cache it in an Array
before calling this function. Caching may also improve performance if the
stream is expensive to evaluate.
rightJoin = flip joinLeft
Space: O(n) assuming the second stream is cached in memory.
Time: O(m x n)
Unimplemented
mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b) Source #
joinLeftMap :: (IsStream t, Ord k, Monad m) => t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b) Source #
joinOuter :: MonadIO m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b) Source #
For all elements in t m a
, for all elements in t m b
if a
and b
are equal by the given equality pedicate then return the tuple (Just a, Just
b). If a
is not found in t m b
then return (a, Nothing), return
(Nothing, b) for vice-versa.
For space efficiency use the smaller stream as the second stream.
Space: O(n)
Time: O(m x n)
Unimplemented