Copyright | (c) 2018 Harendra Kumar (c) Roman Leshchinskiy 2008-2010 (c) The University of Glasgow 2009 |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Direct style re-implementation of CPS style stream in StreamK module. The
symbol or suffix D
in this module denotes the Direct style. GHC is able
to INLINE and fuse direct style better, providing better performance than
CPS implementation.
import qualified Streamly.Internal.Data.Stream.StreamD as D
Synopsis
- data Step s a
- data Stream m a where
- nil :: Monad m => Stream m a
- nilM :: Monad m => m b -> Stream m a
- cons :: Monad m => a -> Stream m a -> Stream m a
- uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
- unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a
- unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
- unfold :: Monad m => Unfold m a b -> a -> Stream m b
- repeat :: Monad m => a -> Stream m a
- repeatM :: Monad m => m a -> Stream m a
- replicate :: Monad m => Int -> a -> Stream m a
- replicateM :: forall m a. Monad m => Int -> m a -> Stream m a
- fromIndices :: Monad m => (Int -> a) -> Stream m a
- fromIndicesM :: Monad m => (Int -> m a) -> Stream m a
- generate :: Monad m => Int -> (Int -> a) -> Stream m a
- generateM :: Monad m => Int -> (Int -> m a) -> Stream m a
- iterate :: Monad m => (a -> a) -> a -> Stream m a
- iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
- enumerateFromStepIntegral :: (Integral a, Monad m) => a -> a -> Stream m a
- enumerateFromIntegral :: (Monad m, Integral a, Bounded a) => a -> Stream m a
- enumerateFromThenIntegral :: (Monad m, Integral a, Bounded a) => a -> a -> Stream m a
- enumerateFromToIntegral :: (Monad m, Integral a) => a -> a -> Stream m a
- enumerateFromThenToIntegral :: (Monad m, Integral a) => a -> a -> a -> Stream m a
- enumerateFromStepNum :: (Monad m, Num a) => a -> a -> Stream m a
- numFrom :: (Monad m, Num a) => a -> Stream m a
- numFromThen :: (Monad m, Num a) => a -> a -> Stream m a
- enumerateFromToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> Stream m a
- enumerateFromThenToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> a -> Stream m a
- currentTime :: MonadAsync m => Double -> Stream m AbsTime
- yield :: Applicative m => a -> Stream m a
- yieldM :: Monad m => m a -> Stream m a
- fromList :: Applicative m => [a] -> Stream m a
- fromListM :: MonadAsync m => [m a] -> Stream m a
- fromStreamK :: Monad m => Stream m a -> Stream m a
- fromStreamD :: (IsStream t, Monad m) => Stream m a -> t m a
- fromPrimVar :: (MonadIO m, Prim a) => Var IO a -> Stream m a
- fromSVar :: MonadAsync m => SVar t m a -> Stream m a
- foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
- foldrT :: (Monad m, Monad (t m), MonadTrans t) => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
- foldrMx :: Monad m => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
- foldr1 :: Monad m => (a -> a -> a) -> Stream m a -> m (Maybe a)
- foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
- foldlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> m b
- foldlS :: Monad m => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
- foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
- reverse :: Monad m => Stream m a -> Stream m a
- reverse' :: forall m a. (MonadIO m, Storable a) => Stream m a -> Stream m a
- foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
- foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
- runFold :: Monad m => Fold m a b -> Stream m a -> m b
- parselMx' :: MonadThrow m => (s -> a -> m (Step s b)) -> m s -> (s -> m b) -> Stream m a -> m b
- splitParse :: MonadThrow m => Parser m a b -> Stream m a -> Stream m b
- tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
- tapOffsetEvery :: Monad m => Int -> Int -> Fold m a b -> Stream m a -> Stream m a
- tapAsync :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a
- tapRate :: (MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> Stream m a -> Stream m a
- pollCounts :: MonadAsync m => (a -> Bool) -> (Stream m Int -> Stream m Int) -> Fold m Int b -> Stream m a -> Stream m a
- drain :: Monad m => Stream m a -> m ()
- null :: Monad m => Stream m a -> m Bool
- head :: Monad m => Stream m a -> m (Maybe a)
- headElse :: Monad m => a -> Stream m a -> m a
- tail :: Monad m => Stream m a -> m (Maybe (Stream m a))
- last :: Monad m => Stream m a -> m (Maybe a)
- elem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
- notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
- all :: Monad m => (a -> Bool) -> Stream m a -> m Bool
- any :: Monad m => (a -> Bool) -> Stream m a -> m Bool
- maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
- maximumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
- minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
- findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
- lookup :: (Monad m, Eq a) => a -> Stream m (a, b) -> m (Maybe b)
- findM :: Monad m => (a -> m Bool) -> Stream m a -> m (Maybe a)
- find :: Monad m => (a -> Bool) -> Stream m a -> m (Maybe a)
- (!!) :: Monad m => Stream m a -> Int -> m (Maybe a)
- toSVarParallel :: MonadAsync m => State t m a -> SVar t m a -> Stream m a -> m ()
- concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
- concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
- data ConcatMapUState o i
- = ConcatMapUOuter o
- | ConcatMapUInner o i
- concatMapU :: Monad m => Unfold m a b -> Stream m a -> Stream m b
- data ConcatUnfoldInterleaveState o i
- = ConcatUnfoldInterleaveOuter o [i]
- | ConcatUnfoldInterleaveInner o [i]
- | ConcatUnfoldInterleaveInnerL [i] [i]
- | ConcatUnfoldInterleaveInnerR [i] [i]
- concatUnfoldInterleave :: Monad m => Unfold m a b -> Stream m a -> Stream m b
- concatUnfoldRoundrobin :: Monad m => Unfold m a b -> Stream m a -> Stream m b
- data AppendState s1 s2
- = AppendFirst s1
- | AppendSecond s2
- append :: Monad m => Stream m a -> Stream m a -> Stream m a
- data InterleaveState s1 s2
- = InterleaveFirst s1 s2
- | InterleaveSecond s1 s2
- | InterleaveSecondOnly s2
- | InterleaveFirstOnly s1
- interleave :: Monad m => Stream m a -> Stream m a -> Stream m a
- interleaveMin :: Monad m => Stream m a -> Stream m a -> Stream m a
- interleaveSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a
- interleaveInfix :: Monad m => Stream m a -> Stream m a -> Stream m a
- roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a
- gintercalateSuffix :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
- interposeSuffix :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c
- gintercalate :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
- interpose :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c
- groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
- groupsOf2 :: Monad m => Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
- groupsBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- groupsRollingBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- splitBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- splitSuffixBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- splitSuffixBy' :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- splitOn :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b
- splitSuffixOn :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a) => Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
- splitInnerBy :: Monad m => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
- splitInnerBySuffix :: (Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
- isPrefixOf :: (Eq a, Monad m) => Stream m a -> Stream m a -> m Bool
- isSubsequenceOf :: (Eq a, Monad m) => Stream m a -> Stream m a -> m Bool
- stripPrefix :: (Eq a, Monad m) => Stream m a -> Stream m a -> m (Maybe (Stream m a))
- mapM_ :: Monad m => (a -> m b) -> Stream m a -> m ()
- toList :: Monad m => Stream m a -> m [a]
- toListRev :: Monad m => Stream m a -> m [a]
- toStreamK :: Monad m => Stream m a -> Stream m a
- toStreamD :: (IsStream t, Monad m) => t m a -> Stream m a
- hoist :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a
- generally :: Monad m => Stream Identity a -> Stream m a
- liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a
- runReaderT :: Monad m => s -> Stream (ReaderT s m) a -> Stream m a
- evalStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m a
- runStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m (s, a)
- transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b
- scanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
- scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
- scanlM :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
- scanl :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
- scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
- scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
- scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
- scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
- prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
- prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
- postscanl :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
- postscanlM :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
- postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
- postscanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
- postscanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
- postscanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
- scanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
- scanlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
- filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- uniq :: (Eq a, Monad m) => Stream m a -> Stream m a
- take :: Monad m => Int -> Stream m a -> Stream m a
- takeByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
- takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- drop :: Monad m => Int -> Stream m a -> Stream m a
- dropByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
- dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- map :: Monad m => (a -> b) -> Stream m a -> Stream m b
- mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
- sequence :: Monad m => Stream m (m a) -> Stream m a
- rollingMap :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
- rollingMapM :: Monad m => (a -> a -> m b) -> Stream m a -> Stream m b
- intersperseM :: Monad m => m a -> Stream m a -> Stream m a
- intersperse :: Monad m => a -> Stream m a -> Stream m a
- intersperseSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a
- intersperseSuffixBySpan :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a
- insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
- deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
- mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
- mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
- indexed :: Monad m => Stream m a -> Stream m (Int, a)
- indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
- zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
- cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
- mergeBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
- mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
- the :: (Eq a, Monad m) => Stream m a -> m (Maybe a)
- newFinalizedIORef :: (MonadIO m, MonadBaseControl IO m) => m a -> m (IORef (Maybe (IO ())))
- runIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
- clearIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
- gbracket :: Monad m => m c -> (forall s. m s -> m (Either e s)) -> (c -> m d) -> (c -> e -> Stream m b) -> (c -> Stream m b) -> Stream m b
- before :: Monad m => m b -> Stream m a -> Stream m a
- after :: Monad m => m b -> Stream m a -> Stream m a
- afterIO :: (MonadIO m, MonadBaseControl IO m) => m b -> Stream m a -> Stream m a
- bracket :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
- bracketIO :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
- onException :: MonadCatch m => m b -> Stream m a -> Stream m a
- finally :: MonadCatch m => m b -> Stream m a -> Stream m a
- finallyIO :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a
- handle :: (MonadCatch m, Exception e) => (e -> Stream m a) -> Stream m a -> Stream m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- mkParallelD :: MonadAsync m => Stream m a -> Stream m a
- newCallbackStream :: (IsStream t, MonadAsync m) => m (a -> m (), t m a)
- lastN :: (Storable a, MonadIO m) => Int -> Fold m a (Array a)
The stream type
A stream consists of a step function that generates the next step given a current state, and the current state.
Instances
MonadTrans Stream Source # | |
Defined in Streamly.Internal.Data.Stream.StreamD.Type | |
Monad m => Monad (Stream m) Source # | |
Functor m => Functor (Stream m) Source # | |
Applicative f => Applicative (Stream f) Source # | |
MonadThrow m => MonadThrow (Stream m) Source # | |
Defined in Streamly.Internal.Data.Stream.StreamD.Type |
Construction
Deconstruction
Generation
Unfolds
Specialized Generation
Generate a monadic stream from a seed.
Enumerations
enumerateFromStepIntegral :: (Integral a, Monad m) => a -> a -> Stream m a Source #
Can be used to enumerate unbounded integrals. This does not check for overflow or underflow for bounded integrals.
enumerateFromToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> Stream m a Source #
enumerateFromThenToFractional :: (Monad m, Fractional a, Ord a) => a -> a -> a -> Stream m a Source #
Time
currentTime :: MonadAsync m => Double -> Stream m AbsTime Source #
Conversions
Transform an input structure into a stream.
| Direct style stream does not support fromFoldable
.
fromListM :: MonadAsync m => [m a] -> Stream m a Source #
Convert a list of monadic actions to a Stream
Elimination
General Folds
foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #
foldrT :: (Monad m, Monad (t m), MonadTrans t) => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b Source #
foldlS :: Monad m => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #
foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b Source #
parselMx' :: MonadThrow m => (s -> a -> m (Step s b)) -> m s -> (s -> m b) -> Stream m a -> m b Source #
Run a Parse
over a stream.
splitParse :: MonadThrow m => Parser m a b -> Stream m a -> Stream m b Source #
Specialized Folds
tapRate :: (MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> Stream m a -> Stream m a Source #
pollCounts :: MonadAsync m => (a -> Bool) -> (Stream m Int -> Stream m Int) -> Fold m Int b -> Stream m a -> Stream m a Source #
toSVarParallel :: MonadAsync m => State t m a -> SVar t m a -> Stream m a -> m () Source #
Flattening nested streams
data ConcatMapUState o i Source #
concatMapU :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
concatMapU unfold stream
uses unfold
to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.
data ConcatUnfoldInterleaveState o i Source #
ConcatUnfoldInterleaveOuter o [i] | |
ConcatUnfoldInterleaveInner o [i] | |
ConcatUnfoldInterleaveInnerL [i] [i] | |
ConcatUnfoldInterleaveInnerR [i] [i] |
data AppendState s1 s2 Source #
AppendFirst s1 | |
AppendSecond s2 |
data InterleaveState s1 s2 Source #
InterleaveFirst s1 s2 | |
InterleaveSecond s1 s2 | |
InterleaveSecondOnly s2 | |
InterleaveFirstOnly s1 |
gintercalateSuffix :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #
Interleave streams (full streams, not the elements) unfolded from two input streams and concat. Stop when the first stream stops. If the second stream ends before the first one then first stream still keeps running alone without any interleaving with the second stream.
- a1, a2, ... an
- [b1, b2 ...] => [streamA1, streamA2, ... streamAn] [streamB1, streamB2, ...] => [streamA1, streamB1, streamA2...StreamAn, streamBn] => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...]
gintercalate :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c Source #
Interleave streams (full streams, not the elements) unfolded from two input streams and concat. Stop when the first stream stops. If the second stream ends before the first one then first stream still keeps running alone without any interleaving with the second stream.
- a1, a2, ... an
- [b1, b2 ...] => [streamA1, streamA2, ... streamAn] [streamB1, streamB2, ...] => [streamA1, streamB1, streamA2...StreamAn, streamBn] => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...]
Grouping
Splitting
splitOn :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #
splitSuffixOn :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a) => Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b Source #
splitInnerBy :: Monad m => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #
Performs infix separator style splitting.
splitInnerBySuffix :: (Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a) Source #
Performs infix separator style splitting.
Substreams
Map and Fold
mapM_ :: Monad m => (a -> m b) -> Stream m a -> m () Source #
Execute a monadic action for each element of the Stream
Conversions
Transform a stream into another type.
Transformation
By folding (scans)
postscanlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b Source #
Filtering
takeByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a Source #
dropByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a Source #
Mapping
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #
Map a monadic function over a Stream
Inserting
intersperseSuffixBySpan :: forall m a. Monad m => Int -> m a -> Stream m a -> Stream m a Source #
intersperse after every n items
Deleting
Map and Filter
Zipping
Comparisons
cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #
Compare two streams lexicographically
Merging
Transformation comprehensions
Exceptions
newFinalizedIORef :: (MonadIO m, MonadBaseControl IO m) => m a -> m (IORef (Maybe (IO ()))) Source #
Create an IORef holding a finalizer that is called automatically when the
IORef is garbage collected. The IORef can be written to with a Nothing
value to deactivate the finalizer.
runIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m () Source #
Run the finalizer stored in an IORef and deactivate it so that it is run only once.
clearIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m () Source #
Deactivate the finalizer stored in an IORef without running it.
:: Monad m | |
=> m c | before |
-> (forall s. m s -> m (Either e s)) | try (exception handling) |
-> (c -> m d) | after, on normal stop |
-> (c -> e -> Stream m b) | on exception |
-> (c -> Stream m b) | stream generator |
-> Stream m b |
The most general bracketing and exception combinator. All other combinators can be expressed in terms of this combinator. This can also be used for cases which are not covered by the standard combinators.
Internal
before :: Monad m => m b -> Stream m a -> Stream m a Source #
Run a side effect before the stream yields its first element.
after :: Monad m => m b -> Stream m a -> Stream m a Source #
Run a side effect whenever the stream stops normally.
bracket :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #
Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action providing the remembered value as an argument whenever the stream ends normally or due to an exception.
bracketIO :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #
onException :: MonadCatch m => m b -> Stream m a -> Stream m a Source #
Run a side effect whenever the stream aborts due to an exception. The exception is not caught, simply rethrown.
finally :: MonadCatch m => m b -> Stream m a -> Stream m a Source #
Run a side effect whenever the stream stops normally or aborts due to an exception.
finallyIO :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a Source #
handle :: (MonadCatch m, Exception e) => (e -> Stream m a) -> Stream m a -> Stream m a Source #
When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.
Concurrent Application
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Internal
mkParallelD :: MonadAsync m => Stream m a -> Stream m a Source #
newCallbackStream :: (IsStream t, MonadAsync m) => m (a -> m (), t m a) Source #
Generates a callback and a stream pair. The callback returned is used to queue values to the stream. The stream is infinite, there is no way for the callback to indicate that it is done now.
Internal