Copyright | (c) 2017 Composewell Technologies |
License | BSD3 |
Maintainer | |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Continuation passing style (CPS) stream implementation. The symbol K
denotes a function as well as a Kontinuation.
import qualified Streamly.Internal.Data.Stream.StreamK as K
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where
- toStream :: t m a -> Stream m a
- fromStream :: Stream m a -> t m a
- consM :: MonadAsync m => m a -> t m a -> t m a
- (|:) :: MonadAsync m => m a -> t m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- newtype Stream m a = MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
- mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- unShare :: IsStream t => t m a -> t m a
- uncons :: (IsStream t, Monad m) => t m a -> m (Maybe (a, t m a))
- unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a
- unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
- repeat :: IsStream t => a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- replicate :: IsStream t => Int -> a -> t m a
- replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
- fromIndices :: IsStream t => (Int -> a) -> t m a
- fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a
- iterate :: IsStream t => (a -> a) -> a -> t m a
- iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a
- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
- fromList :: IsStream t => [a] -> t m a
- fromStreamK :: IsStream t => Stream m a -> t m a
- foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrSM :: (IsStream t, Monad m) => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
- buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a
- buildM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- augmentS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
- augmentSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
- foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
- foldr1 :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> m (Maybe a)
- foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b
- foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b
- foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
- foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> m b
- foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b
- foldlT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b
- foldlx' :: forall t m a b x. (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
- foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
- fold :: (IsStream t, Monad m) => Fold m a b -> t m a -> m b
- drain :: (Monad m, IsStream t) => t m a -> m ()
- null :: (IsStream t, Monad m) => t m a -> m Bool
- head :: (IsStream t, Monad m) => t m a -> m (Maybe a)
- tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
- init :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
- elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
- notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
- all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
- any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
- last :: (IsStream t, Monad m) => t m a -> m (Maybe a)
- minimum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
- minimumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a)
- maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
- maximumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a)
- findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int
- lookup :: (IsStream t, Monad m, Eq a) => a -> t m (a, b) -> m (Maybe b)
- findM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> m (Maybe a)
- find :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m (Maybe a)
- (!!) :: (IsStream t, Monad m) => t m a -> Int -> m (Maybe a)
- mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m ()
- toList :: (IsStream t, Monad m) => t m a -> m [a]
- toStreamK :: Stream m a -> Stream m a
- hoist :: (IsStream t, Monad m, Monad n) => (forall x. m x -> n x) -> t m a -> t n a
- scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b
- scanlx' :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
- filter :: IsStream t => (a -> Bool) -> t m a -> t m a
- take :: IsStream t => Int -> t m a -> t m a
- takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
- drop :: IsStream t => Int -> t m a -> t m a
- dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
- map :: IsStream t => (a -> b) -> t m a -> t m b
- mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
- mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b
- sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
- insertBy :: IsStream t => (a -> a -> Ordering) -> a -> t m a -> t m a
- deleteBy :: IsStream t => (a -> a -> Bool) -> a -> t m a -> t m a
- reverse :: IsStream t => t m a -> t m a
- mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b
- zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c
- zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- concatMapBy :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
- bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
- concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- apWith :: IsStream t => (t m b -> t m b -> t m b) -> t m (a -> b) -> t m a -> t m b
- apSerial :: IsStream t => t m (a -> b) -> t m a -> t m b
- apSerialDiscardFst :: IsStream t => t m a -> t m b -> t m b
- apSerialDiscardSnd :: IsStream t => t m a -> t m b -> t m a
- the :: (Eq a, IsStream t, Monad m) => t m a -> m (Maybe a)
- serial :: IsStream t => t m a -> t m a -> t m a
- consMStream :: Monad m => m a -> Stream m a -> Stream m a
- withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
- mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a
- type Streaming = IsStream
- once :: (Monad m, IsStream t) => m a -> t m a
A class for streams
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #
Class of types that can represent a stream of elements of some type a
some monad m
Since: 0.2.0 (Streamly)
Since: 0.8.0
toStream :: t m a -> Stream m a Source #
fromStream :: Stream m a -> t m a Source #
consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM
. We can read it as "parallel colon
to remember that |
comes before :
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
The stream type
The type Stream m a
represents a monadic stream of values of type a
constructed using actions in monad m
. It uses stop, singleton and yield
continuations equivalent to the following direct style type:
data Stream m a = Stop | Singleton a | Yield a (Stream m a)
To facilitate parallel composition we maintain a local state in an SVar
that is shared across and is used for synchronization of the streams being
The singleton case can be expressed in terms of stop and yield but we have
it as a separate case to optimize composition operations for streams with
single element. We build singleton streams in the implementation of pure
for Applicative and Monad, and in lift
for MonadTrans.
MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) |
MonadTrans Stream Source # | |
Defined in Streamly.Internal.Data.Stream.StreamK.Type | |
IsStream Stream Source # | |
Defined in Streamly.Internal.Data.Stream.StreamK.Type | |
Monad m => Monad (Stream m) Source # | |
Monad m => Functor (Stream m) Source # | |
Monad m => Applicative (Stream m) Source # | |
Defined in Streamly.Internal.Data.Stream.StreamK.Type | |
Semigroup (Stream m a) Source # | |
Monoid (Stream m a) Source # | |
Construction Primitives
mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #
Build a stream from an SVar
, a stop continuation, a singleton stream
continuation and a yield continuation.
nilM :: (IsStream t, Monad m) => m b -> t m a Source #
An empty stream producing a side effect.
> toList (nilM (print "nil")) "nil" []
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as (return a) `consM` r
more efficient. For concurrent streams this is not concurrent whereas
is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
Elimination Primitives
foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.
foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.
Transformation Primitives
Specialized Generation
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #
repeatM = fix . cons repeatM = cycle1 . fromPure
Generate an infinite stream by repeating a monadic value.
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #
fromIndices :: IsStream t => (Int -> a) -> t m a Source #
fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #
iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #
fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #
fromStreamK :: IsStream t => Stream m a -> t m a Source #
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #
Lazy right associative fold to a stream.
buildM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #
augmentSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a Source #
General Folds
foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b Source #
Lazy right associative fold.
foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b Source #
Lazy right fold with a monadic step function.
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #
Right associative fold to an arbitrary transformer monad.
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b Source #
Strict left associative fold.
foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> m b Source #
Like foldl'
but with a monadic step function.
foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #
Lazy left fold to a stream.
foldlT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #
Lazy left fold to an arbitrary transformer monad.
foldlx' :: forall t m a b x. (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
library. The suffix x
is a mnemonic for extraction.
Note that the accumulator is always evaluated including the initial value.
foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #
Like foldx
, but with a monadic step function.
Specialized Folds
drain :: (Monad m, IsStream t) => t m a -> m () Source #
drain = foldl' (\_ _ -> ()) () drain = mapM_ (\_ -> return ())
last :: (IsStream t, Monad m) => t m a -> m (Maybe a) Source #
Extract the last element of the stream, if any.
Map and Fold
mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m () Source #
Apply a monadic action to each element of the stream and discard the output of the action.
By folding (scans)
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #
mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b Source #
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #
Map and Filter
zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Zip two streams serially using a pure zipping function.
Since: 0.1.0
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Zip two streams serially using a monadic zipping function.
Since: 0.1.0
concatMapBy :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #
concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #
See concatPairsWith
apSerialDiscardFst :: IsStream t => t m a -> t m b -> t m b Source #
apSerialDiscardSnd :: IsStream t => t m a -> t m b -> t m a Source #
Transformation comprehensions
Semigroup Style Composition
serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #
Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.
import Streamly.Prelude (serial)
stream1 = Stream.fromList [1,2]
stream2 = Stream.fromList [3,4]
Stream.toList $ stream1 `serial` stream2
This operation can be used to fold an infinite lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #
We can define cyclic structures using let
let (a, b) = ([1, b], head a) in (a, b)
The function fix
defined as:
fix f = let x = f x in x
ensures that the argument of a function and its output refer to the same
lazy value x
i.e. the same location in memory. Thus x
can be defined
in terms of itself, creating structures with cyclic references.
import Data.Function (fix)
f ~(a, b) = ([1, b], head a)
fix f
is essentially the same as fix
but for monadic
Using mfix
for streams we can construct a stream in which each element of
the stream is defined in a cyclic fashion. The argument of the function
being fixed represents the current element of the stream which is being
returned by the stream monad. Thus, we can use the argument to construct
In the following example, the argument action
of the function f
represents the tuple (x,y)
returned by it in a given iteration. We define
the first element of the tuple in terms of the second.
import Streamly.Internal.Data.Stream.IsStream as Stream import System.IO.Unsafe (unsafeInterleaveIO) main = do Stream.mapM_ print $ Stream.mfix f where f action = do let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act x <- Stream.fromListM [incr 1 action, incr 2 action] y <- Stream.fromList [4,5] return (x, y)
Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.
Note that the function f
must be lazy in its argument, that's why we use
on action
because IO monad is strict.