Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Deprecated: Please use Streamly.Internal.Data.Stream.Concurrent from streamly package instead.
To run examples in this module:
>>>
import qualified Streamly.Prelude as Stream
>>>
import Control.Concurrent (threadDelay)
>>>
:{
delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Synopsis
- newtype AsyncT m a = AsyncT {
- getAsyncT :: Stream m a
- type Async = AsyncT IO
- consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
- asyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
- mkAsyncK :: MonadAsync m => Stream m a -> Stream m a
- mkAsyncD :: MonadAsync m => Stream m a -> Stream m a
- newtype WAsyncT m a = WAsyncT {
- getWAsyncT :: Stream m a
- type WAsync = WAsyncT IO
- consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
- wAsyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
Documentation
For AsyncT
streams:
(<>) =async
(>>=) = flip .concatMapWith
async
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
combinator:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
IsStream AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
MonadTrans AsyncT Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
Monad m => Functor (AsyncT m) Source # | |
MonadAsync m => Monad (AsyncT m) Source # | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
MonadAsync m => Monoid (AsyncT m a) Source # | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #
XXX we can implement it more efficienty by directly implementing instead of combining streams using async.
asyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a Source #
mkAsyncK :: MonadAsync m => Stream m a -> Stream m a Source #
Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.
Pre-release
mkAsyncD :: MonadAsync m => Stream m a -> Stream m a Source #
For WAsyncT
streams:
(<>) =wAsync
(>>=) = flip .concatMapWith
wAsync
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
combinator:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
output
stream and all the iterations corresponding to 2
constitute another
WAsyncT
output stream and these two output streams are merged using
wAsync
.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
WAsyncT | |
|
Instances
IsStream WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
MonadTrans WAsyncT Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
Monad m => Functor (WAsyncT m) Source # | |
MonadAsync m => Monad (WAsyncT m) Source # | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
MonadAsync m => Monoid (WAsyncT m a) Source # | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #
XXX we can implement it more efficienty by directly implementing instead of combining streams using wAsync.
wAsyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a Source #