Copyright | (c) 2017 Composewell Technologies |
License | BSD3 |
Maintainer | |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where
- toStream :: t m a -> Stream m a
- fromStream :: Stream m a -> t m a
- consM :: MonadAsync m => m a -> t m a -> t m a
- (|:) :: MonadAsync m => m a -> t m a -> t m a
- newtype Stream m a = MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
- type Streaming = IsStream
- fromStreamS :: (IsStream t, Monad m) => Stream m a -> t m a
- toStreamS :: (IsStream t, Monad m) => t m a -> Stream m a
- fromStreamD :: (IsStream t, Monad m) => Stream m a -> t m a
- toStreamD :: (IsStream t, Monad m) => t m a -> Stream m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> Stream m a -> Stream m a
- mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- data SerialT m a
- type Serial = SerialT IO
- fromSerial :: IsStream t => SerialT m a -> t m a
- data WSerialT m a
- type WSerial = WSerialT IO
- fromWSerial :: IsStream t => WSerialT m a -> t m a
- data AsyncT m a
- type Async = AsyncT IO
- fromAsync :: IsStream t => AsyncT m a -> t m a
- data WAsyncT m a
- type WAsync = WAsyncT IO
- fromWAsync :: IsStream t => WAsyncT m a -> t m a
- data AheadT m a
- type Ahead = AheadT IO
- fromAhead :: IsStream t => AheadT m a -> t m a
- data ParallelT m a
- type Parallel = ParallelT IO
- fromParallel :: IsStream t => ParallelT m a -> t m a
- data ZipSerialM m a
- type ZipSerial = ZipSerialM IO
- fromZipSerial :: IsStream t => ZipSerialM m a -> t m a
- data ZipAsyncM m a
- type ZipAsync = ZipAsyncM IO
- fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- repeat :: IsStream t => a -> t m a
- bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
- concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- drain :: (IsStream t, Monad m) => t m a -> m ()
- fromList :: (Monad m, IsStream t) => [a] -> t m a
- toList :: (IsStream t, Monad m) => t m a -> m [a]
- foldrM :: (IsStream t, Monad m) => (a -> m b -> m b) -> m b -> t m a -> m b
- foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b
- foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
- foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
- foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
- foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
- fold :: (IsStream t, Monad m) => Fold m a b -> t m a -> m b
- eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool
- cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
- interleaving :: IsStream t => WSerialT m a -> t m a
- zipping :: IsStream t => ZipSerialM m a -> t m a
- zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
IsStream Type Class
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #
Class of types that can represent a stream of elements of some type a
some monad m
Since: 0.2.0 (Streamly)
Since: 0.8.0
toStream :: t m a -> Stream m a Source #
fromStream :: Stream m a -> t m a Source #
consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM
. We can read it as "parallel colon
to remember that |
comes before :
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
The type Stream m a
represents a monadic stream of values of type a
constructed using actions in monad m
. It uses stop, singleton and yield
continuations equivalent to the following direct style type:
data Stream m a = Stop | Singleton a | Yield a (Stream m a)
To facilitate parallel composition we maintain a local state in an SVar
that is shared across and is used for synchronization of the streams being
The singleton case can be expressed in terms of stop and yield but we have
it as a separate case to optimize composition operations for streams with
single element. We build singleton streams in the implementation of pure
for Applicative and Monad, and in lift
for MonadTrans.
MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r) |
MonadTrans Stream Source # | |
Defined in Streamly.Internal.Data.Stream.StreamK.Type | |
Monad m => Monad (Stream m) Source # | |
Monad m => Functor (Stream m) Source # | |
Monad m => Applicative (Stream m) Source # | |
Defined in Streamly.Internal.Data.Stream.StreamK.Type | |
Semigroup (Stream m a) Source # | |
Monoid (Stream m a) Source # | |
Type Conversion
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> Stream m a -> Stream m a Source #
Adapt a polymorphic consM operation to a StreamK cons operation
Building a stream
mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #
Build a stream from an SVar
, a stop continuation, a singleton stream
continuation and a yield continuation.
foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.
foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.
Stream Types
For SerialT
(<>) =serial
(>>=) = flip .concatMapWith
A single Monad
bind behaves like a for
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
fromSerial :: IsStream t => SerialT m a -> t m a Source #
For WSerialT
(<>) =wSerial
(>>=) = flip .concatMapWith
Note that <>
is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad
bind behaves like a for
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1
in the first stream with all the nested iterations of element
import Streamly.Prelude (wSerial)
Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT
Since: 0.2.0 (Streamly)
Since: 0.8.0
fromWSerial :: IsStream t => WSerialT m a -> t m a Source #
For AsyncT
(<>) =async
(>>=) = flip .concatMapWith
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
Since: 0.1.0 (Streamly)
Since: 0.8.0
MonadTrans AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
MonadAsync m => Monad (AsyncT m) Source # | |
Monad m => Functor (AsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Monoid (AsyncT m a) Source # | |
For WAsyncT
(<>) =wAsync
(>>=) = flip .concatMapWith
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
stream and all the iterations corresponding to 2
constitute another
output stream and these two output streams are merged using
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
Since: 0.2.0 (Streamly)
Since: 0.8.0
MonadTrans WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
MonadAsync m => Monad (WAsyncT m) Source # | |
Monad m => Functor (WAsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Monoid (WAsyncT m a) Source # | |
fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #
For AheadT
(<>) =ahead
(>>=) = flip .concatMapWith
A single Monad
bind behaves like a for
loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, ahead of time:
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using ahead
Since: 0.3.0 (Streamly)
Since: 0.8.0
MonadTrans AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
IsStream AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # | |
MonadAsync m => Monad (AheadT m) Source # | |
Monad m => Functor (AheadT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AheadT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
MonadAsync m => Semigroup (AheadT m a) Source # | |
MonadAsync m => Monoid (AheadT m a) Source # | |
For ParallelT
(<>) =parallel
(>>=) = flip .concatMapWith
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
fromParallel :: IsStream t => ParallelT m a -> t m a Source #
data ZipSerialM m a Source #
For ZipSerialM
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id
Applicative evaluates the streams being zipped serially:
s1 = Stream.fromFoldable [1, 2]
s2 = Stream.fromFoldable [3, 4]
s3 = Stream.fromFoldable [5, 6]
Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
Since: 0.2.0 (Streamly)
Since: 0.8.0
type ZipSerial = ZipSerialM IO Source #
fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #
For ZipAsyncM
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id
Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:
s = Stream.fromFoldableM $ delay [1, 1, 1]
Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
... [(1,1),(1,1),(1,1)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
IsStream ZipAsyncM Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type Methods toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source # fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source # consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # (|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # | |
Monad m => Functor (ZipAsyncM m) Source # | |
MonadAsync m => Applicative (ZipAsyncM m) Source # | |
Defined in Streamly.Internal.Data.Stream.Zip | |
Semigroup (ZipAsyncM m a) Source # | |
Monoid (ZipAsyncM m a) Source # | |
fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as (return a) `consM` r
more efficient. For concurrent streams this is not concurrent whereas
is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #
concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #
concatMapWith mixer generator stream
is a two dimensional looping
combinator. The generator
function is used to generate streams from the
elements in the input stream
and the mixer
function is used to merge
those streams.
Note we can merge streams concurrently by using a concurrent merge function.
Since: 0.7.0
Since: 0.8.0 (signature change)
Fold Utilities
concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #
A variant of fold
that allows you to fold a Foldable
container of streams using the specified stream sum operation.
concatFoldableWith async
$ map return [1..3]
Equivalent to:
concatFoldableWith f = Prelude.foldr f S.nil concatFoldableWith f = S.concatMapFoldableWith f id
Since: 0.8.0 (Renamed foldWith to concatFoldableWith)
Since: 0.1.0 (Streamly)
concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
A variant of foldMap
that allows you to map a monadic streaming action
on a Foldable
container and then fold it using the specified stream merge
concatMapFoldableWith async
return [1..3]
Equivalent to:
concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)
Since: 0.1.0 (Streamly)
concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like concatMapFoldableWith
but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Equivalent to:
concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs concatForFoldableWith f = flip (S.concatMapFoldableWith f)
Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)
Since: 0.1.0 (Streamly)
Running Effects
Conversion operations
fromList :: (Monad m, IsStream t) => [a] -> t m a Source #
fromList =foldr
Construct a stream from a list of pure values. This is more efficient than
for serial streams.
Since: 0.4.0
toList :: (IsStream t, Monad m) => t m a -> m [a] Source #
Convert a stream into a list in the underlying monad.
Since: 0.1.0
Fold operations
foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b Source #
foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
library. The suffix x
is a mnemonic for extraction.
Since: 0.7.0
foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #
Like foldlx'
, but with a monadic step function.
Since: 0.7.0
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b Source #
Strict left associative fold.
Since: 0.2.0
Zip style operations
eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #
Compare two streams for equality
Since: 0.5.3
cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #
Compare two streams
Since: 0.5.3
interleaving :: IsStream t => WSerialT m a -> t m a Source #
zipping :: IsStream t => ZipSerialM m a -> t m a Source #
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #