streamly-0.7.3: Beautiful Streaming, Concurrent and Reactive Composition
Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.Ahead

Description

 
Synopsis

Documentation

data AheadT m a Source #

The Semigroup operation for AheadT appends two streams. The combined stream behaves like a single stream with the actions from the second stream appended to the first stream. The combined stream is evaluated in the speculative style. This operation can be used to fold an infinite lazy container of streams.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = do
 xs <- S.toList . aheadly $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
 print xs
 where p n = threadDelay 1000000 >> return n
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream.

The monad instance of AheadT may run each monadic continuation (bind) concurrently in a speculative manner, performing side effects in a partially ordered manner but producing the outputs in an ordered manner like SerialT.

main = S.drain . aheadly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Since: 0.3.0

Instances

Instances details
MonadTrans AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a Source #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a Source #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0

aheadly :: IsStream t => AheadT m a -> t m a Source #

Fix the type of a polymorphic stream as AheadT.

Since: 0.3.0

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of AheadT. Merges two streams sequentially but with concurrent lookahead.

Since: 0.3.0