{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ConstraintKinds           #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs              #-}
{-# LANGUAGE LambdaCase                #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE ScopedTypeVariables       #-}
{-# LANGUAGE UndecidableInstances      #-} -- XXX

#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Data.Stream.Async
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.Async
    (
      AsyncT
    , Async
    , asyncly
    , async
    , (<|)             --deprecated
    , mkAsync
    , mkAsyncK

    , WAsyncT
    , WAsync
    , wAsyncly
    , wAsync
    )
where

import Control.Concurrent (myThreadId)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Concurrent.MVar (newEmptyMVar)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif

import Prelude hiding (map)
import qualified Data.Set as S

import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Stream.SVar (fromSVar)
import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.StreamK
       (IsStream(..), Stream, mkStream, foldStream, adapt, foldStreamShared)
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D

#include "Instances.hs"

-------------------------------------------------------------------------------
-- Async
-------------------------------------------------------------------------------

data WorkerStatus = Continue | Suspend

{-# INLINE workLoopLIFO #-}
workLoopLIFO
    :: (MonadIO m, MonadBaseControl IO m)
    => IORef [Stream m a]
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopLIFO :: IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef [Stream m a]
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
    run :: m ()
run = do
        Maybe (Stream m a)
work <- m (Maybe (Stream m a))
dequeue
        let stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
        case Maybe (Stream m a)
work of
            Maybe (Stream m a)
Nothing -> m ()
stop
            Just Stream m a
m -> do
                -- XXX when we finish we need to send the monadic state back to
                -- the parent so that the state can be merged back. We capture
                -- and return the state in the stop continuation.
                --
                -- Instead of using the run function we can just restore the
                -- monad state here. That way it can work easily for
                -- distributed case as well.
                StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
                        State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
                WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                case WorkerStatus
res of
                    WorkerStatus
Continue -> m ()
run
                    WorkerStatus
Suspend -> m ()
stop

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        if Bool
res
        then State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
r
        else IO WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
            -- XXX we also need to save the monadic state here
            SVar Stream m a -> IORef [Stream m a] -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q Stream m a
r
            WorkerStatus -> IO WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

    dequeue :: m (Maybe (Stream m a))
dequeue = IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Stream m a)) -> m (Maybe (Stream m a)))
-> IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [Stream m a]
-> ([Stream m a] -> ([Stream m a], Maybe (Stream m a)))
-> IO (Maybe (Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [Stream m a]
q (([Stream m a] -> ([Stream m a], Maybe (Stream m a)))
 -> IO (Maybe (Stream m a)))
-> ([Stream m a] -> ([Stream m a], Maybe (Stream m a)))
-> IO (Maybe (Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
                [] -> ([], Maybe (Stream m a)
forall a. Maybe a
Nothing)
                Stream m a
x : [Stream m a]
xs -> ([Stream m a]
xs, Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just Stream m a
x)

-- We duplicate workLoop for yield limit and no limit cases because it has
-- around 40% performance overhead in the worst case.
--
-- XXX we can pass yinfo directly as an argument here so that we do not have to
-- make a check every time.
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
    :: (MonadIO m, MonadBaseControl IO m)
    => IORef [Stream m a]
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopLIFOLimited :: IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef [Stream m a]
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
    incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
    run :: m ()
run = do
        Maybe (Stream m a)
work <- m (Maybe (Stream m a))
dequeue
        let stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
        case Maybe (Stream m a)
work of
            Maybe (Stream m a)
Nothing -> m ()
stop
            Just Stream m a
m -> do
                -- XXX This is just a best effort minimization of concurrency
                -- to the yield limit. If the stream is made of concurrent
                -- streams we do not reserve the yield limit in the constituent
                -- streams before executing the action. This can be done
                -- though, by sharing the yield limit ref with downstream
                -- actions via state passing. Just a todo.
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
                if Bool
yieldLimitOk
                then do
                    StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
                            State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
                    WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                    case WorkerStatus
res of
                        WorkerStatus
Continue -> m ()
run
                        WorkerStatus
Suspend -> m ()
stop
                -- Avoid any side effects, undo the yield limit decrement if we
                -- never yielded anything.
                else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    SVar Stream m a -> IORef [Stream m a] -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q Stream m a
m
                    SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
                    SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    -- XXX can we pass on the yield limit downstream to limit the concurrency
    -- of constituent streams.
    yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
        if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
r
        else IO WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
            SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
            SVar Stream m a -> IORef [Stream m a] -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q Stream m a
r
            WorkerStatus -> IO WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

    dequeue :: m (Maybe (Stream m a))
dequeue = IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Stream m a)) -> m (Maybe (Stream m a)))
-> IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [Stream m a]
-> ([Stream m a] -> ([Stream m a], Maybe (Stream m a)))
-> IO (Maybe (Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [Stream m a]
q (([Stream m a] -> ([Stream m a], Maybe (Stream m a)))
 -> IO (Maybe (Stream m a)))
-> ([Stream m a] -> ([Stream m a], Maybe (Stream m a)))
-> IO (Maybe (Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
                [] -> ([], Maybe (Stream m a)
forall a. Maybe a
Nothing)
                Stream m a
x : [Stream m a]
xs -> ([Stream m a]
xs, Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just Stream m a
x)

-------------------------------------------------------------------------------
-- WAsync
-------------------------------------------------------------------------------

-- XXX we can remove sv as it is derivable from st

{-# INLINE workLoopFIFO #-}
workLoopFIFO
    :: (MonadIO m, MonadBaseControl IO m)
    => LinkedQueue (Stream m a)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopFIFO :: LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO LinkedQueue (Stream m a)
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
    run :: m ()
run = do
        Maybe (Stream m a)
work <- IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Stream m a)) -> m (Maybe (Stream m a)))
-> IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (Stream m a) -> IO (Maybe (Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (Stream m a)
q
        let stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
        case Maybe (Stream m a)
work of
            Maybe (Stream m a)
Nothing -> m ()
stop
            Just Stream m a
m -> do
                StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
                        State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
                WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                case WorkerStatus
res of
                    WorkerStatus
Continue -> m ()
run
                    WorkerStatus
Suspend -> m ()
stop

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    -- XXX in general we would like to yield "n" elements from a single stream
    -- before moving on to the next. Single element granularity could be too
    -- expensive in certain cases. Similarly, we can use time limit for
    -- yielding.
    yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> LinkedQueue (Stream m a) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q Stream m a
r
        WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
    :: (MonadIO m, MonadBaseControl IO m)
    => LinkedQueue (Stream m a)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopFIFOLimited :: LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited LinkedQueue (Stream m a)
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run

    where

    mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
    incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
    run :: m ()
run = do
        Maybe (Stream m a)
work <- IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Stream m a)) -> m (Maybe (Stream m a)))
-> IO (Maybe (Stream m a)) -> m (Maybe (Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (Stream m a) -> IO (Maybe (Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (Stream m a)
q
        let stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
        case Maybe (Stream m a)
work of
            Maybe (Stream m a)
Nothing -> m ()
stop
            Just Stream m a
m -> do
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
                if Bool
yieldLimitOk
                then do
                    StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
                            State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
                    WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
                    case WorkerStatus
res of
                        WorkerStatus
Continue -> m ()
run
                        WorkerStatus
Suspend -> m ()
stop
                else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    SVar Stream m a -> LinkedQueue (Stream m a) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q Stream m a
m
                    SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
                    SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo

    single :: a -> m WorkerStatus
single a
a = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend

    yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
        Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> LinkedQueue (Stream m a) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q Stream m a
r
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
        if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
        else IO WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
            SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
            WorkerStatus -> IO WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

-------------------------------------------------------------------------------
-- SVar creation
-- This code belongs in SVar.hs but is kept here for perf reasons
-------------------------------------------------------------------------------

-- XXX we have this function in this file because passing runStreamLIFO as a
-- function argument to this function results in a perf degradation of more
-- than 10%.  Need to investigate what the root cause is.
-- Interestingly, the same thing does not make any difference for Ahead.
getLifoSVar :: forall m a. MonadAsync m
    => State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar :: State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar State Stream m a
st RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
    IORef [Stream m a]
q       <- [Stream m a] -> IO (IORef [Stream m a])
forall a. a -> IO (IORef a)
newIORef []
    Maybe (IORef Count)
yl      <- case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
                Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
                Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- State Stream m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State Stream m a
st

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = [Stream m a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([Stream m a] -> Bool) -> IO [Stream m a] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [Stream m a] -> IO [Stream m a]
forall a. IORef a -> IO a
readIORef IORef [Stream m a]
q

    let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
            Bool
yieldsDone <-
                    case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
                        Just IORef Count
ref -> do
                            Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
                            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
                        Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Bool
qEmpty <- [Stream m a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([Stream m a] -> Bool) -> IO [Stream m a] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [Stream m a] -> IO [Stream m a]
forall a. IORef a -> IO a
readIORef IORef [Stream m a]
q
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone

    let getSVar :: SVar Stream m a
            -> (SVar Stream m a -> m [ChildEvent a])
            -> (SVar Stream m a -> m Bool)
            -> (SVar Stream m a -> IO Bool)
            -> (IORef [Stream m a]
                -> State Stream m a
                -> SVar Stream m a
                -> Maybe WorkerInfo
                -> m())
            -> SVar Stream m a
        getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a -> m Bool
postProc SVar Stream m a -> IO Bool
workDone IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> (t m a -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
            , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st
            , pushBufferSpace :: IORef Count
pushBufferSpace  = IORef Count
forall a. HasCallStack => a
undefined
            , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
            , pushBufferMVar :: MVar ()
pushBufferMVar   = MVar ()
forall a. HasCallStack => a
undefined
            , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State Stream m a
st) (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a
sv
            , postProcess :: m Bool
postProcess      = SVar Stream m a -> m Bool
postProc SVar Stream m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
            , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop IORef [Stream m a]
q State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} SVar Stream m a
sv
            , enqueue :: Stream m a -> IO ()
enqueue          = SVar Stream m a -> IORef [Stream m a] -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q
            , isWorkDone :: IO Bool
isWorkDone       = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
            , isQueueDone :: IO Bool
isQueueDone      = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
            , needDoorBell :: IORef Bool
needDoorBell     = IORef Bool
wfw
            , svarStyle :: SVarStyle
svarStyle        = SVarStyle
AsyncVar
            , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
StopNone
            , svarStopBy :: IORef ThreadId
svarStopBy       = IORef ThreadId
forall a. HasCallStack => a
undefined
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            , accountThread :: ThreadId -> m ()
accountThread    = SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar Stream m a
sv
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
forall a. HasCallStack => a
undefined
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = State Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State Stream m a
st
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue   = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
            , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
outputHeap       = IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
forall a. HasCallStack => a
undefined
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: SVar Stream m a
sv =
            case State Stream m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State Stream m a
st of
                Maybe Rate
Nothing ->
                    case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
                        Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
                                              SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
                                              IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
                        Just Count
_  -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
                                              SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
                                              IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
                Just Rate
_  ->
                    case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
                        Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
                                              SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
                                              IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
                        Just Count
_  -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
                                              SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
                                              IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
     in SVar Stream m a -> IO (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv

getFifoSVar :: forall m a. MonadAsync m
    => State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar :: State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar State Stream m a
st RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
    LinkedQueue (Stream m a)
q       <- IO (LinkedQueue (Stream m a))
forall a. IO (LinkedQueue a)
newQ
    Maybe (IORef Count)
yl      <- case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
                Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
                Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- State Stream m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State Stream m a
st

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = LinkedQueue (Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (Stream m a)
q
    let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
            Bool
yieldsDone <-
                    case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
                        Just IORef Count
ref -> do
                            Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
                            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
                        Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Bool
qEmpty <- LinkedQueue (Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (Stream m a)
q
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone

    let getSVar :: SVar Stream m a
            -> (SVar Stream m a -> m [ChildEvent a])
            -> (SVar Stream m a -> m Bool)
            -> (SVar Stream m a -> IO Bool)
            -> (LinkedQueue (Stream m a)
                -> State Stream m a
                -> SVar Stream m a
                -> Maybe WorkerInfo
                -> m())
            -> SVar Stream m a
        getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a -> m Bool
postProc SVar Stream m a -> IO Bool
workDone LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> (t m a -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
            , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st
            , pushBufferSpace :: IORef Count
pushBufferSpace  = IORef Count
forall a. HasCallStack => a
undefined
            , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
            , pushBufferMVar :: MVar ()
pushBufferMVar   = MVar ()
forall a. HasCallStack => a
undefined
            , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State Stream m a
st) (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a
sv
            , postProcess :: m Bool
postProcess      = SVar Stream m a -> m Bool
postProc SVar Stream m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
            , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop LinkedQueue (Stream m a)
q State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} SVar Stream m a
sv
            , enqueue :: Stream m a -> IO ()
enqueue          = SVar Stream m a -> LinkedQueue (Stream m a) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q
            , isWorkDone :: IO Bool
isWorkDone       = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
            , isQueueDone :: IO Bool
isQueueDone      = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
            , needDoorBell :: IORef Bool
needDoorBell     = IORef Bool
wfw
            , svarStyle :: SVarStyle
svarStyle        = SVarStyle
WAsyncVar
            , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
StopNone
            , svarStopBy :: IORef ThreadId
svarStopBy       = IORef ThreadId
forall a. HasCallStack => a
undefined
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            , accountThread :: ThreadId -> m ()
accountThread    = SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar Stream m a
sv
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
forall a. HasCallStack => a
undefined
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = State Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State Stream m a
st
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue   = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
            , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
outputHeap       = IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
forall a. HasCallStack => a
undefined
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: SVar Stream m a
sv =
            case State Stream m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State Stream m a
st of
                Maybe Rate
Nothing ->
                    case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
                        Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
                                              SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
                                              LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
                        Just Count
_  -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
                                              SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
                                              LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
                Just Rate
_  ->
                    case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
                        Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
                                              SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
                                              LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
                        Just Count
_  -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
    -> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
                                              SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
                                              SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
                                              LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
     in SVar Stream m a -> IO (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv

{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
    => State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar :: State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
st Stream m a
m = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
    SVar Stream m a
sv <- IO (SVar Stream m a) -> m (SVar Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar Stream m a) -> m (SVar Stream m a))
-> IO (SVar Stream m a) -> m (SVar Stream m a)
forall a b. (a -> b) -> a -> b
$ State Stream m a -> RunInIO m -> IO (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar State Stream m a
st RunInIO m
mrun
    SVar Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar Stream m a
sv Stream m a
m

-- | Generate a stream asynchronously to keep it buffered, lazily consume
-- from the buffer.
--
-- /Internal/
--
{-# INLINABLE mkAsyncK #-}
mkAsyncK :: (IsStream t, MonadAsync m) => t m a -> t m a
mkAsyncK :: t m a -> t m a
mkAsyncK t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar Stream m a
sv <- State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m)
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv

{-# INLINE_NORMAL mkAsyncD #-}
mkAsyncD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkAsyncD :: Stream m a -> Stream m a
mkAsyncD Stream m a
m = (State Stream m a
 -> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
    where

    step :: State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State Stream m a
gst Maybe (Stream m a)
Nothing = do
        SVar Stream m a
sv <- State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
gst (Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD Stream m a
m)
        Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
D.fromSVar SVar Stream m a
sv

    step State Stream m a
gst (Just (D.UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            D.Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
D.Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
            D.Skip s
s    -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
            Step s a
D.Stop      -> Step (Maybe (Stream m a)) a
forall s a. Step s a
D.Stop

-- This is slightly faster than the CPS version above
--
-- | Make the stream producer and consumer run concurrently by introducing a
-- buffer between them. The producer thread evaluates the input stream until
-- the buffer fills, it terminates if the buffer is full and a worker thread is
-- kicked off again to evaluate the remaining stream when there is space in the
-- buffer.  The consumer consumes the stream lazily from the buffer.
--
-- /Internal/
--
{-# INLINE_NORMAL mkAsync #-}
mkAsync :: (K.IsStream t, MonadAsync m) => t m a -> t m a
mkAsync :: t m a -> t m a
mkAsync = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncD (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD

-- | Create a new SVar and enqueue one stream computation on it.
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
    => State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar :: State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar State Stream m a
st Stream m a
m = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
    SVar Stream m a
sv <- IO (SVar Stream m a) -> m (SVar Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar Stream m a) -> m (SVar Stream m a))
-> IO (SVar Stream m a) -> m (SVar Stream m a)
forall a b. (a -> b) -> a -> b
$ State Stream m a -> RunInIO m -> IO (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar State Stream m a
st RunInIO m
mrun
    SVar Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar Stream m a
sv Stream m a
m

------------------------------------------------------------------------------
-- Running streams concurrently
------------------------------------------------------------------------------

-- Concurrency rate control.
--
-- Our objective is to create more threads on demand if the consumer is running
-- faster than us. As soon as we encounter a concurrent composition we create a
-- push pull pair of threads. We use an SVar for communication between the
-- consumer, pulling from the SVar and the producer who is pushing to the SVar.
-- The producer creates more threads if the SVar drains and becomes empty, that
-- is the consumer is running faster.
--
-- XXX Note 1: This mechanism can be problematic if the initial production
-- latency is high, we may end up creating too many threads. So we need some
-- way to monitor and use the latency as well. Having a limit on the dispatches
-- (programmer controlled) may also help.
--
-- TBD Note 2: We may want to run computations at the lower level of the
-- composition tree serially even when they are composed using a parallel
-- combinator. We can use 'serial' in place of 'async' and 'wSerial' in
-- place of 'wAsync'. If we find that an SVar immediately above a computation
-- gets drained empty we can switch to parallelizing the computation.  For that
-- we can use a state flag to fork the rest of the computation at any point of
-- time inside the Monad bind operation if the consumer is running at a faster
-- speed.
--
-- TBD Note 3: the binary operation ('parallel') composition allows us to
-- dispatch a chunkSize of only 1.  If we have to dispatch in arbitrary
-- chunksizes we will need to compose the parallel actions using a data
-- constructor (A Free container) instead so that we can divide it in chunks of
-- arbitrary size before dispatching. If the stream is composed of
-- hierarchically composed grains of different sizes then we can always switch
-- to a desired granularity depending on the consumer speed.
--
-- TBD Note 4: for pure work (when we are not in the IO monad) we can divide it
-- into just the number of CPUs.

-- | Join two computations on the currently running 'SVar' queue for concurrent
-- execution.  When we are using parallel composition, an SVar is passed around
-- as a state variable. We try to schedule a new parallel computation on the
-- SVar passed to us. The first time, when no SVar exists, a new SVar is
-- created.  Subsequently, 'joinStreamVarAsync' may get called when a computation
-- already scheduled on the SVar is further evaluated. For example, when (a
-- `parallel` b) is evaluated it calls a 'joinStreamVarAsync' to put 'a' and 'b' on
-- the current scheduler queue.
--
-- The 'SVarStyle' required by the current composition context is passed as one
-- of the parameters.  If the scheduling and composition style of the new
-- computation being scheduled is different than the style of the current SVar,
-- then we create a new SVar and schedule it on that.  The newly created SVar
-- joins as one of the computations on the current SVar queue.
--
-- Cases when we need to switch to a new SVar:
--
-- * (x `parallel` y) `parallel` (t `parallel` u) -- all of them get scheduled on the same SVar
-- * (x `parallel` y) `parallel` (t `async` u) -- @t@ and @u@ get scheduled on a new child SVar
--   because of the scheduling policy change.
-- * if we 'adapt' a stream of type 'async' to a stream of type
--   'Parallel', we create a new SVar at the transitioning bind.
-- * When the stream is switching from disjunctive composition to conjunctive
--   composition and vice-versa we create a new SVar to isolate the scheduling
--   of the two.

forkSVarAsync :: (IsStream t, MonadAsync m)
    => SVarStyle -> t m a -> t m a -> t m a
forkSVarAsync :: SVarStyle -> t m a -> t m a -> t m a
forkSVarAsync SVarStyle
style t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
    SVar Stream m a
sv <- case SVarStyle
style of
        SVarStyle
AsyncVar -> State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
st (Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadIO m, IsStream t) =>
t m a -> Stream m a -> t m a
concurrently (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2))
        SVarStyle
WAsyncVar -> State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar State Stream m a
st (Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadIO m, IsStream t) =>
t m a -> Stream m a -> t m a
concurrently (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2))
        SVarStyle
_ -> [Char] -> m (SVar Stream m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"illegal svar type"
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv
    where
    concurrently :: t m a -> Stream m a -> t m a
concurrently t m a
ma Stream m a
mb = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue (Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st) Stream m a
mb
        State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
ma

{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: (IsStream t, MonadAsync m)
    => SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync :: SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync SVarStyle
style t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
    case State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
        Just SVar Stream m a
sv | SVar Stream m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style -> do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue SVar Stream m a
sv (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2)
            State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m1
        Maybe (SVar Stream m a)
_ -> State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVarStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
forkSVarAsync SVarStyle
style t m a
m1 t m a
m2)

------------------------------------------------------------------------------
-- Semigroup and Monoid style compositions for parallel actions
------------------------------------------------------------------------------

-- | Polymorphic version of the 'Semigroup' operation '<>' of 'AsyncT'.
-- Merges two streams possibly concurrently, preferring the
-- elements from the left one when available.
--
-- @since 0.2.0
{-# INLINE async #-}
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
async :: t m a -> t m a -> t m a
async = SVarStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync SVarStyle
AsyncVar

-- | Same as 'async'.
--
-- @since 0.1.0
{-# DEPRECATED (<|) "Please use 'async' instead." #-}
{-# INLINE (<|) #-}
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
<| :: t m a -> t m a -> t m a
(<|) = t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async

-- IMPORTANT: using a monomorphically typed and SPECIALIZED consMAsync makes a
-- huge difference in the performance of consM in IsStream instance even we
-- have a SPECIALIZE in the instance.
--
-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using async.
{-# INLINE consMAsync #-}
{-# SPECIALIZE consMAsync :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
consMAsync :: m a -> AsyncT m a -> AsyncT m a
consMAsync m a
m AsyncT m a
r = Stream m a -> AsyncT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.yieldM m a
m Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`async` (AsyncT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AsyncT m a
r)

------------------------------------------------------------------------------
-- AsyncT
------------------------------------------------------------------------------

-- | The 'Semigroup' operation (@<>@) for 'AsyncT' merges two streams
-- concurrently with priority given to the first stream. In @s1 <> s2 <> s3
-- ...@ the streams s1, s2 and s3 are scheduled for execution in that order.
-- Multiple scheduled streams may be executed concurrently and the elements
-- generated by them are served to the consumer as and when they become
-- available. This behavior is similar to the scheduling and execution behavior
-- of actions in a single async stream.
--
-- Since only a finite number of streams are executed concurrently, this
-- operation can be used to fold an infinite lazy container of streams.
--
-- @
-- import "Streamly"
-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = (S.toList . 'asyncly' $ (S.fromList [1,2]) \<> (S.fromList [3,4])) >>= print
-- @
-- @
-- [1,2,3,4]
-- @
--
-- Any exceptions generated by a constituent stream are propagated to the
-- output stream. The output and exceptions from a single stream are guaranteed
-- to arrive in the same order in the resulting stream as they were generated
-- in the input stream. However, the relative ordering of elements from
-- different streams in the resulting stream can vary depending on scheduling
-- and generation delays.
--
-- Similarly, the monad instance of 'AsyncT' /may/ run each iteration
-- concurrently based on demand.  More concurrent iterations are started only
-- if the previous iterations are not able to produce enough output for the
-- consumer.
--
-- @
-- main = 'drain' . 'asyncly' $ do
--     n <- return 3 \<\> return 2 \<\> return 1
--     S.yieldM $ do
--          threadDelay (n * 1000000)
--          myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
-- @
-- ThreadId 40: Delay 1
-- ThreadId 39: Delay 2
-- ThreadId 38: Delay 3
-- @
--
-- @since 0.1.0
newtype AsyncT m a = AsyncT {AsyncT m a -> Stream m a
getAsyncT :: Stream m a}
    deriving (m a -> AsyncT m a
(forall (m :: * -> *) a. Monad m => m a -> AsyncT m a)
-> MonadTrans AsyncT
forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> AsyncT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
MonadTrans)

-- | A demand driven left biased parallely composing IO stream of elements of
-- type @a@.  See 'AsyncT' documentation for more details.
--
-- @since 0.2.0
type Async = AsyncT IO

-- | Fix the type of a polymorphic stream as 'AsyncT'.
--
-- @since 0.1.0
asyncly :: IsStream t => AsyncT m a -> t m a
asyncly :: AsyncT m a -> t m a
asyncly = AsyncT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream AsyncT where
    toStream :: AsyncT m a -> Stream m a
toStream = AsyncT m a -> Stream m a
forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT
    fromStream :: Stream m a -> AsyncT m a
fromStream = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT
    consM :: m a -> AsyncT m a -> AsyncT m a
consM = m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync
    |: :: m a -> AsyncT m a -> AsyncT m a
(|:) = m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

-- Monomorphically typed version of "async" for better performance of Semigroup
-- instance.
{-# INLINE mappendAsync #-}
{-# SPECIALIZE mappendAsync :: AsyncT IO a -> AsyncT IO a -> AsyncT IO a #-}
mappendAsync :: MonadAsync m => AsyncT m a -> AsyncT m a -> AsyncT m a
mappendAsync :: AsyncT m a -> AsyncT m a -> AsyncT m a
mappendAsync AsyncT m a
m1 AsyncT m a
m2 = Stream m a -> AsyncT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async (AsyncT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AsyncT m a
m1) (AsyncT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AsyncT m a
m2)

instance MonadAsync m => Semigroup (AsyncT m a) where
    <> :: AsyncT m a -> AsyncT m a -> AsyncT m a
(<>) = AsyncT m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
mappendAsync

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (AsyncT m a) where
    mempty :: AsyncT m a
mempty = AsyncT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
    mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a
mappend = AsyncT m a -> AsyncT m a -> AsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

{-# INLINE apAsync #-}
{-# SPECIALIZE apAsync :: AsyncT IO (a -> b) -> AsyncT IO a -> AsyncT IO b #-}
apAsync :: MonadAsync m => AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync (AsyncT Stream m (a -> b)
m1) (AsyncT Stream m a
m2) =
    let f :: (a -> b) -> Stream m b
f a -> b
x1 = (forall c. Stream m c -> Stream m c -> Stream m c)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in Stream m b -> AsyncT m b
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1

instance (Monad m, MonadAsync m) => Applicative (AsyncT m) where
    {-# INLINE pure #-}
    pure :: a -> AsyncT m a
pure = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> (a -> Stream m a) -> a -> AsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
    {-# INLINE (<*>) #-}
    <*> :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
(<*>) = AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

-- GHC: if we change the implementation of bindWith with arguments in a
-- different order we see a significant performance degradation (~2x).
{-# INLINE bindAsync #-}
{-# SPECIALIZE bindAsync :: AsyncT IO a -> (a -> AsyncT IO b) -> AsyncT IO b #-}
bindAsync :: MonadAsync m => AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync AsyncT m a
m a -> AsyncT m b
f = Stream m b -> AsyncT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> t m a -> (a -> t m b) -> t m b
K.bindWith forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async (AsyncT m a -> Stream m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt AsyncT m a
m) (\a
a -> AsyncT m b -> Stream m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt (AsyncT m b -> Stream m b) -> AsyncT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ a -> AsyncT m b
f a
a)

-- GHC: if we specify arguments in the definition of (>>=) we see a significant
-- performance degradation (~2x).
instance MonadAsync m => Monad (AsyncT m) where
    return :: a -> AsyncT m a
return = a -> AsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    >>= :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
(>>=) = AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)

------------------------------------------------------------------------------
-- WAsyncT
------------------------------------------------------------------------------

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using wAsync.
{-# INLINE consMWAsync #-}
{-# SPECIALIZE consMWAsync :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
consMWAsync :: m a -> WAsyncT m a -> WAsyncT m a
consMWAsync m a
m WAsyncT m a
r = Stream m a -> WAsyncT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.yieldM m a
m Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`wAsync` (WAsyncT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WAsyncT m a
r)

-- | Polymorphic version of the 'Semigroup' operation '<>' of 'WAsyncT'.
-- Merges two streams concurrently choosing elements from both fairly.
--
-- @since 0.2.0
{-# INLINE wAsync #-}
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
wAsync :: t m a -> t m a -> t m a
wAsync = SVarStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync SVarStyle
WAsyncVar

-- | 'WAsyncT' is similar to 'WSerialT' but with concurrent execution.
-- The 'Semigroup' operation (@<>@) for 'WAsyncT' merges two streams
-- concurrently interleaving the actions from both the streams.  In @s1
-- <> s2 <> s3 ...@, the individual actions from streams @s1@, @s2@ and @s3@
-- are scheduled for execution in a round-robin fashion.  Multiple scheduled
-- actions may be executed concurrently, the results from concurrent executions
-- are consumed in the order in which they become available.
--
--
-- The @W@ in the name stands for @wide@ or breadth wise scheduling in
-- contrast to the depth wise scheduling behavior of 'AsyncT'.
--
-- @
-- import "Streamly"
-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = (S.toList . 'wAsyncly' . maxThreads 1 $ (S.fromList [1,2]) \<> (S.fromList [3,4])) >>= print
-- @
-- @
-- [1,3,2,4]
-- @
--
-- For this example, we are using @maxThreads 1@ so that concurrent thread
-- scheduling does not affect the results and make them unpredictable. Let's
-- now take a more general example:
--
-- @
-- main = (S.toList . 'wAsyncly' . maxThreads 1 $ (S.fromList [1,2,3]) \<> (S.fromList [4,5,6]) \<> (S.fromList [7,8,9])) >>= print
-- @
-- @
-- [1,4,2,7,5,3,8,6,9]
-- @
--
-- This is how the execution of the above stream proceeds:
--
-- 1. The scheduler queue is initialized with @[S.fromList [1,2,3],
-- (S.fromList [4,5,6]) \<> (S.fromList [7,8,9])]@ assuming the head of the
-- queue is represented by the  rightmost item.
-- 2. @S.fromList [1,2,3]@ is executed, yielding the element @1@ and putting
-- @[2,3]@ at the back of the scheduler queue. The scheduler queue now looks
-- like @[(S.fromList [4,5,6]) \<> (S.fromList [7,8,9]), S.fromList [2,3]]@.
-- 3. Now @(S.fromList [4,5,6]) \<> (S.fromList [7,8,9])@ is picked up for
-- execution, @S.fromList [7,8,9]@ is added at the back of the queue and
-- @S.fromList [4,5,6]@ is executed, yielding the element @4@ and adding
-- @S.fromList [5,6]@ at the back of the queue. The queue now looks like
-- @[S.fromList [2,3], S.fromList [7,8,9], S.fromList [5,6]]@.
-- 4. Note that the scheduler queue expands by one more stream component in
-- every pass because one more @<>@ is broken down into two components. At this
-- point there are no more @<>@ operations to be broken down further and the
-- queue has reached its maximum size. Now these streams are scheduled in
-- round-robin fashion yielding @[2,7,5,3,8,8,9]@.
--
-- As we see above, in a right associated expression composed with @<>@, only
-- one @<>@ operation is broken down into two components in one execution,
-- therefore, if we have @n@ streams composed using @<>@ it will take @n@
-- scheduler passes to expand the whole expression.  By the time @n-th@
-- component is added to the scheduler queue, the first component would have
-- received @n@ scheduler passes.
--
-- Since all streams get interleaved, this operation is not suitable for
-- folding an infinite lazy container of infinite size streams.  However, if
-- the streams are small, the streams on the left may get finished before more
-- streams are added to the scheduler queue from the right side of the
-- expression, so it may be possible to fold an infinite lazy container of
-- streams. For example, if the streams are of size @n@ then at most @n@
-- streams would be in the scheduler queue at a time.
--
-- Note that 'WSerialT' and 'WAsyncT' differ in their scheduling behavior,
-- therefore the output of 'WAsyncT' even with a single thread of execution is
-- not the same as that of 'WSerialT' See notes in 'WSerialT' for details about
-- its scheduling behavior.
--
-- Any exceptions generated by a constituent stream are propagated to the
-- output stream. The output and exceptions from a single stream are guaranteed
-- to arrive in the same order in the resulting stream as they were generated
-- in the input stream. However, the relative ordering of elements from
-- different streams in the resulting stream can vary depending on scheduling
-- and generation delays.
--
-- Similarly, the 'Monad' instance of 'WAsyncT' runs /all/ iterations fairly
-- concurrently using a round robin scheduling.
--
-- @
-- main = 'drain' . 'wAsyncly' $ do
--     n <- return 3 \<\> return 2 \<\> return 1
--     S.yieldM $ do
--          threadDelay (n * 1000000)
--          myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
-- @
-- ThreadId 40: Delay 1
-- ThreadId 39: Delay 2
-- ThreadId 38: Delay 3
-- @
--
-- @since 0.2.0
newtype WAsyncT m a = WAsyncT {WAsyncT m a -> Stream m a
getWAsyncT :: Stream m a}
    deriving (m a -> WAsyncT m a
(forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a)
-> MonadTrans WAsyncT
forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> WAsyncT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
MonadTrans)

-- | A round robin parallely composing IO stream of elements of type @a@.
-- See 'WAsyncT' documentation for more details.
--
-- @since 0.2.0
type WAsync = WAsyncT IO

-- | Fix the type of a polymorphic stream as 'WAsyncT'.
--
-- @since 0.2.0
wAsyncly :: IsStream t => WAsyncT m a -> t m a
wAsyncly :: WAsyncT m a -> t m a
wAsyncly = WAsyncT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream WAsyncT where
    toStream :: WAsyncT m a -> Stream m a
toStream = WAsyncT m a -> Stream m a
forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT
    fromStream :: Stream m a -> WAsyncT m a
fromStream = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT
    consM :: m a -> WAsyncT m a -> WAsyncT m a
consM = m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync
    |: :: m a -> WAsyncT m a -> WAsyncT m a
(|:) = m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE mappendWAsync #-}
{-# SPECIALIZE mappendWAsync :: WAsyncT IO a -> WAsyncT IO a -> WAsyncT IO a #-}
mappendWAsync :: MonadAsync m => WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappendWAsync :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappendWAsync WAsyncT m a
m1 WAsyncT m a
m2 = Stream m a -> WAsyncT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync (WAsyncT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WAsyncT m a
m1) (WAsyncT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WAsyncT m a
m2)

instance MonadAsync m => Semigroup (WAsyncT m a) where
    <> :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
(<>) = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappendWAsync

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (WAsyncT m a) where
    mempty :: WAsyncT m a
mempty = WAsyncT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
    mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappend = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------

{-# INLINE apWAsync #-}
{-# SPECIALIZE apWAsync :: WAsyncT IO (a -> b) -> WAsyncT IO a -> WAsyncT IO b #-}
apWAsync :: MonadAsync m => WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync (WAsyncT Stream m (a -> b)
m1) (WAsyncT Stream m a
m2) =
    let f :: (a -> b) -> Stream m b
f a -> b
x1 = (forall c. Stream m c -> Stream m c -> Stream m c)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
    in Stream m b -> WAsyncT m b
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1

-- GHC: if we specify arguments in the definition of (<*>) we see a significant
-- performance degradation (~2x).
instance (Monad m, MonadAsync m) => Applicative (WAsyncT m) where
    pure :: a -> WAsyncT m a
pure = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a)
-> (a -> Stream m a) -> a -> WAsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
    <*> :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
(<*>) = WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

-- GHC: if we change the implementation of bindWith with arguments in a
-- different order we see a significant performance degradation (~2x).
{-# INLINE bindWAsync #-}
{-# SPECIALIZE bindWAsync :: WAsyncT IO a -> (a -> WAsyncT IO b) -> WAsyncT IO b #-}
bindWAsync :: MonadAsync m => WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync WAsyncT m a
m a -> WAsyncT m b
f = Stream m b -> WAsyncT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> t m a -> (a -> t m b) -> t m b
K.bindWith forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync (WAsyncT m a -> Stream m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt WAsyncT m a
m) (\a
a -> WAsyncT m b -> Stream m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt (WAsyncT m b -> Stream m b) -> WAsyncT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ a -> WAsyncT m b
f a
a)

-- GHC: if we specify arguments in the definition of (>>=) we see a significant
-- performance degradation (~2x).
instance MonadAsync m => Monad (WAsyncT m) where
    return :: a -> WAsyncT m a
return = a -> WAsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    >>= :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
(>>=) = WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)