{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ConstraintKinds           #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs              #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE UndecidableInstances      #-} -- XXX

-- |
-- Module      : Streamly.Internal.Data.Stream.Ahead
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.Ahead
    (
      AheadT
    , Ahead
    , aheadly
    , ahead
    )
where

import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (void, when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (inline)

import qualified Data.Heap as H

import Streamly.Internal.Data.Stream.SVar (fromSVar)
import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.StreamK
       (IsStream(..), Stream, mkStream, foldStream, foldStreamShared)
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D

import Prelude hiding (map)

#include "Instances.hs"

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than on item in it.
--
-- XXX Also note that limiting concurrency for cases like "take 10" would not
-- work well with left associative expressions, because we have no visibility
-- about how much the left side of the expression would yield.
--
-- XXX It may be a good idea to increment sequence numbers for each yield,
-- currently a stream on the left side of the expression may yield many
-- elements with the same sequene number. We can then use the seq number to
-- enforce yieldMax and yieldLImit as well.

-- Invariants:
--
-- * A worker should always ensure that it pushes all the consecutive items in
-- the heap to the outputQueue especially the items on behalf of the workers
-- that have already left when we were holding the token. This avoids deadlock
-- conditions when the later workers completion depends on the consumption of
-- earlier results. For more details see comments in the consumer pull side
-- code.

{-# INLINE underMaxHeap #-}
underMaxHeap ::
       SVar Stream m a
    -> Heap (Entry Int (AheadHeapEntry Stream m a))
    -> IO Bool
underMaxHeap :: SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
hp = do
    ([ChildEvent a]
_, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar Stream m a
sv)

    -- XXX simplify this
    let maxHeap :: Limit
maxHeap = case SVar Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar Stream m a
sv of
            Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$
                Word -> Word -> Word
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim Word -> Word -> Word
forall a. Num a => a -> a -> a
- Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
            Limit
Unlimited -> Limit
Unlimited

    case Limit
maxHeap of
        Limited Word
lim -> do
            Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar Stream m a
sv)
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Heap (Entry Int (AheadHeapEntry Stream m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
hp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
        Limit
Unlimited -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- Return value:
-- True => stop
-- False => continue
preStopCheck ::
       SVar Stream m a
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
    -> IO Bool
preStopCheck :: SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap =
    -- check the stop condition under a lock before actually
    -- stopping so that the whole herd does not stop at once.
    IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> IO Bool)
-> IO Bool
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
  -> IO Bool)
 -> IO Bool)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> IO Bool)
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry Stream m a))
hp, Maybe Int
_) -> do
        Bool
heapOk <- SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
hp
        MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv)
        let stop :: IO Bool
stop = do
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv) ()
                Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            continue :: IO Bool
continue = do
                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv) ()
                Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        if Bool
heapOk
        then
            case SVar Stream m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar Stream m a
sv of
                Maybe YieldRateInfo
Nothing -> IO Bool
continue
                Just YieldRateInfo
yinfo -> do
                    Bool
rateOk <- SVar Stream m a -> YieldRateInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar Stream m a
sv YieldRateInfo
yinfo
                    if Bool
rateOk then IO Bool
continue else IO Bool
stop
        else IO Bool
stop

abortExecution ::
       IORef ([Stream m a], Int)
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> IO ()
abortExecution :: IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m = do
    SVar Stream m a -> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar Stream m a
sv IORef ([Stream m a], Int)
q Stream m a
m
    SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
    SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo

-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
-- a 100,000 threads producing output and queuing it to the heap for
-- sequencing. The heap can be drained only by one thread at a time, any thread
-- that finds that heap can be drained now, takes a lock and starts draining
-- it, however the thread may get prempted in the middle of it holding the
-- lock. Since that thread is holding the lock, the other threads cannot pick
-- up the draining task, therefore they proceed to picking up the next task to
-- execute. If the draining thread could yield voluntarily at a point where it
-- has released the lock, then the next threads could pick up the draining
-- instead of executing more tasks. When there are 100,000 threads the drainer
-- gets a cpu share to run only 1:100000 of the time. This makes the heap
-- accumulate a lot of output when we the buffer size is large.
--
-- The solutions to this problem are:
-- 1) make the other threads wait in a queue until the draining finishes
-- 2) make the other threads queue and go away if draining is in progress
--
-- In both cases we give the drainer a chance to run more often.
--
processHeap
    :: (MonadIO m, MonadBaseControl IO m)
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> AheadHeapEntry Stream m a
    -> Int
    -> Bool -- we are draining the heap before we stop
    -> m ()
processHeap :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
sno AheadHeapEntry Stream m a
entry

    where

    stopIfNeeded :: AheadHeapEntry Stream m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry Stream m a
ent Int
seqNo Stream m a
r = do
        Bool
stopIt <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
        if Bool
stopIt
        then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            -- put the entry back in the heap and stop
            IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Entry Int (AheadHeapEntry Stream m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry Stream m a
ent) Int
seqNo
            SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
        else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r

    loopHeap :: Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
seqNo AheadHeapEntry Stream m a
ent =
        case AheadHeapEntry Stream m a
ent of
            AheadHeapEntry Stream m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
            AheadEntryPure a
a -> do
                -- Use 'send' directly so that we do not account this in worker
                -- latency as this will not be the real latency.
                -- Don't stop the worker in this case as we are just
                -- transferring available results from heap to outputQueue.
                m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
                Int -> m ()
nextHeap Int
seqNo
            AheadEntryStream Stream m a
r ->
                if Bool
stopping
                then AheadHeapEntry Stream m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry Stream m a
ent Int
seqNo Stream m a
r
                else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r

    nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
        HeapDequeueResult Stream m a
res <- IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult Stream m a)
 -> m (HeapDequeueResult Stream m a))
-> IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        case HeapDequeueResult Stream m a
res of
            Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) -> Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
seqNo AheadHeapEntry Stream m a
hent
            HeapDequeueResult Stream m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ ->
                if Bool
stopping
                then do
                    Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
                    if Bool
r
                    then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
                    else Int -> m ()
processWorkQueue Int
prevSeqNo
                else (Int -> m ()) -> Int -> m ()
forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo

    processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
        Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
        case Maybe (Stream m a, Int)
work of
            Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
            Just (Stream m a
m, Int
seqNo) -> do
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
                if Bool
yieldLimitOk
                then
                    if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                    then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                    else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m

    -- We do not stop the worker on buffer full here as we want to proceed to
    -- nextHeap anyway so that we can clear any subsequent entries. We stop
    -- only in yield continuation where we may have a remaining stream to be
    -- pushed on the heap.
    singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
        m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Int -> m ()
nextHeap Int
seqNo

    -- XXX when we have an unfinished stream on the heap we cannot account all
    -- the yields of that stream until it finishes, so if we have picked up
    -- and executed more actions beyond that in the parent stream and put them
    -- on the heap then they would eat up some yield limit which is not
    -- correct, we will think that our yield limit is over even though we have
    -- to yield items from unfinished stream before them. For this reason, if
    -- there are pending items in the heap we drain them unconditionally
    -- without considering the yield limit.
    runStreamWithYieldLimit :: Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r = do
        Bool
_ <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
        if Bool
continue -- see comment above -- && yieldLimitOk
        then do
            let stop :: m ()
stop = do
                  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
                  Int -> m ()
nextHeap Int
seqNo
            State Stream m a
-> (a -> Stream m a -> m ())
-> (a -> m ())
-> m ()
-> Stream m a
-> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st
                          (Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo)
                          (Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
                          m ()
stop
                          Stream m a
r
        else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            let ent :: Entry Int (AheadHeapEntry Stream m a)
ent = Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo (Stream m a -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
t m a -> AheadHeapEntry t m a
AheadEntryStream Stream m a
r)
            IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Entry Int (AheadHeapEntry Stream m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Entry Int (AheadHeapEntry Stream m a)
ent Int
seqNo
            SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
            SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo

    yieldStreamFromHeap :: Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo a
a Stream m a
r = do
        Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r

{-# NOINLINE drainHeap #-}
drainHeap
    :: (MonadIO m, MonadBaseControl IO m)
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
drainHeap :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = do
    HeapDequeueResult Stream m a
r <- IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult Stream m a)
 -> m (HeapDequeueResult Stream m a))
-> IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO (HeapDequeueResult Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
    case HeapDequeueResult Stream m a
r of
        Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) ->
            IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
hent Int
seqNo Bool
True
        HeapDequeueResult Stream m a
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo

data HeapStatus = HContinue | HStop
data WorkerStatus = Continue | Suspend

processWithoutToken
    :: (MonadIO m, MonadBaseControl IO m)
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithoutToken :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo = do
    -- we have already decremented the yield limit for m
    let stop :: m WorkerStatus
stop = do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
            -- If the stream stops without yielding anything, and we do not put
            -- anything on heap, but if heap was waiting for this seq number
            -- then it will keep waiting forever, because we are never going to
            -- put it on heap. So we have to put a null entry on heap even when
            -- we stop.
            AheadHeapEntry Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
        mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv

    StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
            State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st
                (\a
a Stream m a
r -> AheadHeapEntry Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap (AheadHeapEntry Stream m a -> m WorkerStatus)
-> AheadHeapEntry Stream m a -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ Stream m a -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
t m a -> AheadHeapEntry t m a
AheadEntryStream (Stream m a -> AheadHeapEntry Stream m a)
-> Stream m a -> AheadHeapEntry Stream m a
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
K.cons a
a Stream m a
r)
                (AheadHeapEntry Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap (AheadHeapEntry Stream m a -> m WorkerStatus)
-> (a -> AheadHeapEntry Stream m a) -> a -> m WorkerStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
                m WorkerStatus
stop
                Stream m a
m
    WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
    case WorkerStatus
res of
        WorkerStatus
Continue -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
        WorkerStatus
Suspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo

    where

    -- XXX to reduce contention each CPU can have its own heap
    toHeap :: AheadHeapEntry Stream m a -> m WorkerStatus
toHeap AheadHeapEntry Stream m a
ent = do
        -- Heap insertion is an expensive affair so we use a non CAS based
        -- modification, otherwise contention and retries can make a thread
        -- context switch and throw it behind other threads which come later in
        -- sequence.
        Heap (Entry Int (AheadHeapEntry Stream m a))
newHp <- IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
-> m (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
 -> m (Heap (Entry Int (AheadHeapEntry Stream m a))))
-> IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
-> m (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
        Heap (Entry Int (AheadHeapEntry Stream m a))))
-> IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
      Heap (Entry Int (AheadHeapEntry Stream m a))))
 -> IO (Heap (Entry Int (AheadHeapEntry Stream m a))))
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
        Heap (Entry Int (AheadHeapEntry Stream m a))))
-> IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry Stream m a))
hp, Maybe Int
snum) ->
            let hp' :: Heap (Entry Int (AheadHeapEntry Stream m a))
hp' = Entry Int (AheadHeapEntry Stream m a)
-> Heap (Entry Int (AheadHeapEntry Stream m a))
-> Heap (Entry Int (AheadHeapEntry Stream m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry Stream m a
ent) Heap (Entry Int (AheadHeapEntry Stream m a))
hp
            in Bool
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
    Heap (Entry Int (AheadHeapEntry Stream m a)))
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
    Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry Stream m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry Stream m a))
hp')

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heap (Entry Int (AheadHeapEntry Stream m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
newHp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxHp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv) (Heap (Entry Int (AheadHeapEntry Stream m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
newHp)

        Bool
heapOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
newHp
        HeapStatus
status <-
            case SVar Stream m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar Stream m a
sv of
                Maybe YieldRateInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                Just YieldRateInfo
yinfo ->
                    case Maybe WorkerInfo
winfo of
                        Just WorkerInfo
info -> do
                            Bool
rateOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> YieldRateInfo -> WorkerInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar Stream m a
sv YieldRateInfo
yinfo WorkerInfo
info
                            if Bool
rateOk
                            then HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
                            else HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
                        Maybe WorkerInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue

        if Bool
heapOk
        then
            case HeapStatus
status of
                HeapStatus
HContinue -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
                HeapStatus
HStop -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
        else WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend

data TokenWorkerStatus = TokenContinue Int | TokenSuspend

processWithToken
    :: (MonadIO m, MonadBaseControl IO m)
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithToken :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
action Int
sno = do
    -- Note, we enter this function with yield limit already decremented
    -- XXX deduplicate stop in all invocations
    let stop :: m TokenWorkerStatus
stop = do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
            TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv

    StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
        State Stream m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
sno) (Int -> a -> m TokenWorkerStatus
forall (m :: * -> *). MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stop Stream m a
action

    TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
    case TokenWorkerStatus
res of
        TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
        TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo

    where

    singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
        Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        if Bool
continue
        then TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        else do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    -- XXX use a wrapper function around stop so that we never miss
    -- incrementing the yield in a stop continuation. Essentiatlly all
    -- "unstream" calls in this function must increment yield limit on stop.
    yieldOutput :: Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a Stream m a
r = do
        Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
        if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
        then do
            let stop :: m TokenWorkerStatus
stop = do
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
                    TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            State Stream m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st
                          (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                          (Int -> a -> m TokenWorkerStatus
forall (m :: * -> *). MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                          m TokenWorkerStatus
stop
                          Stream m a
r
        else do
            let ent :: Entry Int (AheadHeapEntry Stream m a)
ent = Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo (Stream m a -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
t m a -> AheadHeapEntry t m a
AheadEntryStream Stream m a
r)
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Entry Int (AheadHeapEntry Stream m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Entry Int (AheadHeapEntry Stream m a)
ent Int
seqNo
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
            TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend

    loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
        Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
        case Maybe (Stream m a, Int)
work of
            Maybe (Stream m a, Int)
Nothing -> do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Int
nextSeqNo
                IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo

            Just (Stream m a
m, Int
seqNo) -> do
                Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
                let undo :: m ()
undo = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                        IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Int
nextSeqNo
                        SVar Stream m a -> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar Stream m a
sv IORef ([Stream m a], Int)
q Stream m a
m
                        SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
                if Bool
yieldLimitOk
                then
                    if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
nextSeqNo
                    then do
                        let stop :: m TokenWorkerStatus
stop = do
                                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
                                TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
                        StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
                            State Stream m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st
                                          (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
                                          (Int -> a -> m TokenWorkerStatus
forall (m :: * -> *). MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
                                          m TokenWorkerStatus
stop
                                          Stream m a
m
                        TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
                        case TokenWorkerStatus
res of
                            TokenContinue Int
seqNo1 -> Int -> m ()
loopWithToken Int
seqNo1
                            TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo

                    else
                        -- To avoid a race when another thread puts something
                        -- on the heap and goes away, the consumer will not get
                        -- a doorBell and we will not clear the heap before
                        -- executing the next action. If the consumer depends
                        -- on the output that is stuck in the heap then this
                        -- will result in a deadlock. So we always clear the
                        -- heap before executing the next action.
                        m ()
undo m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
                else m ()
undo m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo

-- XXX the yield limit changes increased the performance overhead by 30-40%.
-- Just like AsyncT we can use an implementation without yeidlimit and even
-- without pacing code to keep the performance higher in the unlimited and
-- unpaced case.
--
-- XXX The yieldLimit stuff is pretty invasive. We can instead do it by using
-- three hooks, a pre-execute hook, a yield hook and a stop hook. In fact these
-- hooks can be used for a more general implementation to even check predicates
-- and not just yield limit.

-- XXX we can remove the sv parameter as it can be derived from st

workLoopAhead
    :: (MonadIO m, MonadBaseControl IO m)
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopAhead :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = do
        HeapDequeueResult Stream m a
r <- IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult Stream m a)
 -> m (HeapDequeueResult Stream m a))
-> IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO (HeapDequeueResult Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
        case HeapDequeueResult Stream m a
r of
            Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) ->
                IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
hent Int
seqNo Bool
False
            HeapDequeueResult Stream m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
            Waiting Int
_ -> do
                -- Before we execute the next item from the work queue we check
                -- if we are beyond the yield limit. It is better to check the
                -- yield limit before we pick up the next item. Otherwise we
                -- may have already started more tasks even though we may have
                -- reached the yield limit.  We can avoid this by taking active
                -- workers into account, but that is not as reliable, because
                -- workers may go away without picking up work and yielding a
                -- value.
                --
                -- Rate control can be done either based on actual yields in
                -- the output queue or based on any yield either to the heap or
                -- to the output queue. In both cases we may have one issue or
                -- the other. We chose to do this based on actual yields to the
                -- output queue because it makes the code common to both async
                -- and ahead streams.
                --
                Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
                case Maybe (Stream m a, Int)
work of
                    Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
                    Just (Stream m a
m, Int
seqNo) -> do
                        Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
                        if Bool
yieldLimitOk
                        then
                            if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                            then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                            else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
                        -- If some worker decremented the yield limit but then
                        -- did not yield anything and therefore incremented it
                        -- later, then if we did not requeue m here we may find
                        -- the work queue empty and therefore miss executing
                        -- the remaining action.
                        else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m

-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------

-- XXX To be implemented. Use a linked queue like WAsync and put back the
-- remaining computation at the back of the queue instead of the heap, and
-- increment the sequence number.

-- The only difference between forkSVarAsync and this is that we run the left
-- computation without a shared SVar.
forkSVarAhead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
forkSVarAhead :: t m a -> t m a -> t m a
forkSVarAhead t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
        SVar Stream m a
sv <- State Stream m a
-> Stream m a
-> (IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar t m a)
newAheadVar State Stream m a
st (Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadIO m, IsStream t) =>
t m a -> Stream m a -> t m a
concurrently (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2))
                          IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead
        State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVar Stream m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv)
    where
    concurrently :: t m a -> Stream m a -> t m a
concurrently t m a
ma Stream m a
mb = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue (Maybe (SVar Stream m a) -> SVar Stream m a
forall a. (?callStack::CallStack) => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st) Stream m a
mb
        State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
ma

-- | Polymorphic version of the 'Semigroup' operation '<>' of 'AheadT'.
-- Merges two streams sequentially but with concurrent lookahead.
--
-- @since 0.3.0
{-# INLINE ahead #-}
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
ahead :: t m a -> t m a -> t m a
ahead t m a
m1 t m a
m2 = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
    case State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
        Just SVar Stream m a
sv | SVar Stream m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar -> do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue SVar Stream m a
sv (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2)
            -- Always run the left side on a new SVar to avoid complexity in
            -- sequencing results. This means the left side cannot further
            -- split into more ahead computations on the same SVar.
            State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m1
        Maybe (SVar Stream m a)
_ -> State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
forkSVarAhead t m a
m1 t m a
m2)

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using ahead.
{-# INLINE consMAhead #-}
{-# SPECIALIZE consMAhead :: IO a -> AheadT IO a -> AheadT IO a #-}
consMAhead :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consMAhead :: m a -> AheadT m a -> AheadT m a
consMAhead m a
m AheadT m a
r = Stream m a -> AheadT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.yieldM m a
m Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`ahead` (AheadT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AheadT m a
r)

------------------------------------------------------------------------------
-- AheadT
------------------------------------------------------------------------------

-- | The 'Semigroup' operation for 'AheadT' appends two streams. The combined
-- stream behaves like a single stream with the actions from the second stream
-- appended to the first stream. The combined stream is evaluated in the
-- speculative style.  This operation can be used to fold an infinite lazy
-- container of streams.
--
-- @
-- import "Streamly"
-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = do
--  xs \<- S.'toList' . 'aheadly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
--  print xs
--  where p n = threadDelay 1000000 >> return n
-- @
-- @
-- [1,2,3,4]
-- @
--
-- Any exceptions generated by a constituent stream are propagated to the
-- output stream.
--
-- The monad instance of 'AheadT' may run each monadic continuation (bind)
-- concurrently in a speculative manner, performing side effects in a partially
-- ordered manner but producing the outputs in an ordered manner like
-- 'SerialT'.
--
-- @
-- main = S.drain . 'aheadly' $ do
--     n <- return 3 \<\> return 2 \<\> return 1
--     S.yieldM $ do
--          threadDelay (n * 1000000)
--          myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
-- @
-- ThreadId 40: Delay 1
-- ThreadId 39: Delay 2
-- ThreadId 38: Delay 3
-- @
--
-- @since 0.3.0
newtype AheadT m a = AheadT {AheadT m a -> Stream m a
getAheadT :: Stream m a}
    deriving (m a -> AheadT m a
(forall (m :: * -> *) a. Monad m => m a -> AheadT m a)
-> MonadTrans AheadT
forall (m :: * -> *) a. Monad m => m a -> AheadT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> AheadT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> AheadT m a
MonadTrans)

-- | A serial IO stream of elements of type @a@ with concurrent lookahead.  See
-- 'AheadT' documentation for more details.
--
-- @since 0.3.0
type Ahead = AheadT IO

-- | Fix the type of a polymorphic stream as 'AheadT'.
--
-- @since 0.3.0
aheadly :: IsStream t => AheadT m a -> t m a
aheadly :: AheadT m a -> t m a
aheadly = AheadT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt

instance IsStream AheadT where
    toStream :: AheadT m a -> Stream m a
toStream = AheadT m a -> Stream m a
forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT
    fromStream :: Stream m a -> AheadT m a
fromStream = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT
    consM :: m a -> AheadT m a -> AheadT m a
consM = m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consMAhead
    |: :: m a -> AheadT m a -> AheadT m a
(|:) = m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consMAhead

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE mappendAhead #-}
{-# SPECIALIZE mappendAhead :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
mappendAhead :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
mappendAhead :: AheadT m a -> AheadT m a -> AheadT m a
mappendAhead AheadT m a
m1 AheadT m a
m2 = Stream m a -> AheadT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
ahead (AheadT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AheadT m a
m1) (AheadT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AheadT m a
m2)

instance MonadAsync m => Semigroup (AheadT m a) where
    <> :: AheadT m a -> AheadT m a -> AheadT m a
(<>) = AheadT m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
mappendAhead

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (AheadT m a) where
    mempty :: AheadT m a
mempty = AheadT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
    mappend :: AheadT m a -> AheadT m a -> AheadT m a
mappend = AheadT m a -> AheadT m a -> AheadT m a
forall a. Semigroup a => a -> a -> a
(<>)

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

{-# INLINE concatMapAhead #-}
{-# SPECIALIZE concatMapAhead :: (a -> AheadT IO b) -> AheadT IO a -> AheadT IO b #-}
concatMapAhead :: MonadAsync m => (a -> AheadT m b) -> AheadT m a -> AheadT m b
concatMapAhead :: (a -> AheadT m b) -> AheadT m a -> AheadT m b
concatMapAhead a -> AheadT m b
f AheadT m a
m = Stream m b -> AheadT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$
    (forall c. Stream m c -> Stream m c -> Stream m c)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
ahead (\a
a -> AheadT m b -> Stream m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt (AheadT m b -> Stream m b) -> AheadT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ a -> AheadT m b
f a
a) (AheadT m a -> Stream m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt AheadT m a
m)

{-# INLINE apAhead #-}
apAhead :: MonadAsync m => AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead :: AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead (AheadT Stream m (a -> b)
m1) (AheadT Stream m a
m2) =
        let f :: (a -> b) -> Stream m b
f a -> b
x1 = (forall c. Stream m c -> Stream m c -> Stream m c)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
ahead (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
         in Stream m b -> AheadT m b
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$ (forall c. Stream m c -> Stream m c -> Stream m c)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall c. Stream m c -> Stream m c -> Stream m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
ahead (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1

instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
    {-# INLINE pure #-}
    pure :: a -> AheadT m a
pure = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> (a -> Stream m a) -> a -> AheadT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
    {-# INLINE (<*>) #-}
    <*> :: AheadT m (a -> b) -> AheadT m a -> AheadT m b
(<*>) = AheadT m (a -> b) -> AheadT m a -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead

instance MonadAsync m => Monad (AheadT m) where
    return :: a -> AheadT m a
return = a -> AheadT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE (>>=) #-}
    >>= :: AheadT m a -> (a -> AheadT m b) -> AheadT m b
(>>=) = ((a -> AheadT m b) -> AheadT m a -> AheadT m b)
-> AheadT m a -> (a -> AheadT m b) -> AheadT m b
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> AheadT m b) -> AheadT m a -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
(a -> AheadT m b) -> AheadT m a -> AheadT m b
concatMapAhead

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)