-- |
-- Module      : Streamly.Internal.Data.Stream.SVar.Eliminate
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Eliminate a stream by distributing it to multiple SVars concurrently.
--
module Streamly.Internal.Data.Stream.SVar.Eliminate
    (
    -- * Concurrent Function Application
      toSVarParallel

    -- * Concurrent folds
    -- $concurrentFolds
    , newFoldSVar
    , newFoldSVarF

    , fromConsumer
    , pushToFold
    , teeToSVar
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when, void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Fold.SVar (write, writeLimited)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)

import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

import Streamly.Internal.Data.SVar

-------------------------------------------------------------------------------
-- Concurrent function application
-------------------------------------------------------------------------------

-- Using StreamD the worker stream producing code can fuse with the code to
-- queue output to the SVar giving some perf boost.
--
-- Note that StreamD can only be used in limited situations, specifically, we
-- cannot implement joinStreamVarPar using this.
--
-- XXX make sure that the SVar passed is a Parallel style SVar.

-- | Fold the supplied stream to the SVar asynchronously using Parallel
-- concurrency style.
-- {-# INLINE_NORMAL toSVarParallel #-}
{-# INLINE toSVarParallel #-}
toSVarParallel :: MonadAsync m
    => State t m a -> SVar t m a -> D.Stream m a -> m ()
toSVarParallel :: State t m a -> SVar t m a -> Stream m a -> m ()
toSVarParallel State t m a
st SVar t m a
sv Stream m a
xs =
    if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
    then m ()
forkWithDiag
    else do
        ThreadId
tid <-
                case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
                    Maybe Count
Nothing -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
forall a. Maybe a
Nothing)
                                      (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                      (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
                    Just Count
_  -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
forall a. Maybe a
Nothing)
                                      (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                      (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
        SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid

    where

    {-# NOINLINE work #-}
    work :: Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
info = Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (SVar t m a -> Maybe WorkerInfo -> Fold m a ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs

    {-# NOINLINE workLim #-}
    workLim :: Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
info = Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (SVar t m a -> Maybe WorkerInfo -> Fold m a ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs

    {-# NOINLINE forkWithDiag #-}
    forkWithDiag :: m ()
forkWithDiag = do
        -- We do not use workerCount in case of ParallelVar but still there is
        -- no harm in maintaining it correctly.
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
        SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
        -- This allocation matters when significant number of workers are being
        -- sent. We allocate it only when needed. The overhead increases by 4x.
        Maybe WorkerInfo
winfo <-
            case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
                Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
                    IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
                    AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                    IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
                    Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
                        { workerYieldMax :: Count
workerYieldMax = Count
0
                        , workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
                        , workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
                        }
        ThreadId
tid <-
            case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
                Maybe Count
Nothing -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
winfo)
                                  (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                  (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
                Just Count
_  -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
winfo)
                                  (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
                                  (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
        SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid

-------------------------------------------------------------------------------
-- Support for running folds concurrently
-------------------------------------------------------------------------------

-- $concurrentFolds
--
-- To run folds concurrently, we need to decouple the fold execution from the
-- stream production. We use the SVar to do that, we have a single worker
-- pushing the stream elements to the SVar and on the consumer side a fold
-- driver pulls the values and folds them.
--
-- @
--
-- Fold worker <------SVar<------input stream
--     |  exceptions  |
--     --------------->
--
-- @
--
-- We need a channel for pushing exceptions from the fold worker to the stream
-- pusher. The stream may be pushed to multiple folds at the same time. For
-- that we need one SVar per fold:
--
-- @
--
-- Fold worker <------SVar<---
--                    |       |
-- Fold worker <------SVar<------input stream
--                    |       |
-- Fold worker <------SVar<---
--
-- @
--
-- Unlike in case concurrent stream evaluation, the puller does not drive the
-- scheduling and concurrent execution of the stream. The stream is simply
-- pushed by the stream producer at its own rate. The fold worker just pulls it
-- and folds it.
--
-- Note: If the stream pusher terminates due to an exception, we do not
-- actively terminate the fold. It gets cleaned up by the GC.

-------------------------------------------------------------------------------
-- Process events received by a fold consumer from a stream producer
-------------------------------------------------------------------------------

-- | Pull a stream from an SVar to fold it. Like 'fromSVar' except that it does
-- not drive the evaluation of the stream. It just pulls whatever is available
-- on the SVar. Also, when the fold stops it sends a notification to the stream
-- pusher/producer. No exceptions are expected to be propagated from the stream
-- pusher to the fold puller.
--
{-# NOINLINE fromProducer #-}
fromProducer :: forall m a . MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromProducer :: SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
    [ChildEvent a]
list <- SVar Stream m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar Stream m a
sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> Stream m a
processEvents ([ChildEvent a] -> Stream m a) -> [ChildEvent a] -> Stream m a
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    allDone :: m r -> m r
    allDone :: m r -> m r
allDone m r
stp = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Done"
        SVar Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar Stream m a
sv
        m r
stp

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> K.Stream m a
    processEvents :: [ChildEvent a] -> Stream m a
processEvents [] = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
        State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv

    processEvents (ChildEvent a
ev : [ChildEvent a]
es) = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> Stream m a -> m r
yld a -> m r
_ m r
stp -> do
        let rest :: Stream m a
rest = [ChildEvent a] -> Stream m a
processEvents [ChildEvent a]
es
        case ChildEvent a
ev of
            ChildYield a
a -> a -> Stream m a -> m r
yld a
a Stream m a
rest
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                SVar Stream m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar Stream m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> m r -> m r
forall r. m r -> m r
allDone m r
stp
                    Just SomeException
_ -> String -> m r
forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"

-- | Create a Fold style SVar that runs a supplied fold function as the
-- consumer.  Any elements sent to the SVar are consumed by the supplied fold
-- function.
--
{-# INLINE newFoldSVar #-}
newFoldSVar :: MonadAsync m
    => State K.Stream m a -> (SerialT m a -> m b) -> m (SVar K.Stream m a)
newFoldSVar :: State Stream m a -> (SerialT m a -> m b) -> m (SVar Stream m a)
newFoldSVar State Stream m a
stt SerialT m a -> m b
f = do
    -- Buffer size for the SVar is derived from the current state
    SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
stt)

    -- Add the producer thread-id to the SVar.
    IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Stream m a
sv

    m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ SerialT m a -> m b
f (SerialT m a -> m b) -> SerialT m a -> m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv)
                  (SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv)
                  (SVar Stream m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar Stream m a
sv)
    SVar Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv

data FromSVarState t m a =
      FromSVarInit
    | FromSVarRead (SVar t m a)
    | FromSVarLoop (SVar t m a) [ChildEvent a]
    | FromSVarDone (SVar t m a)

-- | Like 'fromProducer' but generates a StreamD style stream instead of
-- StreamK.
--
{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromProducerD :: SVar t m a -> Stream m a
fromProducerD SVar t m a
svar = (State Stream m a
 -> FromSVarState t m a -> m (Step (FromSVarState t m a) a))
-> FromSVarState t m a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) p (t :: (* -> *) -> * -> *) a.
MonadIO m =>
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
svar)
    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- SVar t m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop SVar t m a
sv []) = Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv
    step p
_ (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
D.Yield a
a (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                SVar t m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv
                        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                    Just SomeException
_ -> String -> m (Step (FromSVarState t m a) a)
forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"

    step p
_ (FromSVarDone SVar t m a
sv) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
D.Stop

    step p
_ FromSVarState t m a
FromSVarInit = m (Step (FromSVarState t m a) a)
forall a. HasCallStack => a
undefined

-- | Like 'newFoldSVar' except that it uses a 'Fold' instead of a fold
-- function.
--
{-# INLINE newFoldSVarF #-}
newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF :: State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF State t m a
stt Fold m a b
f = do
    -- Buffer size for the SVar is derived from the current state
    SVar t m a
sv <- SVarStopStyle -> State t m a -> m (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (State t m a -> State t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State t m a
stt)
    -- Add the producer thread-id to the SVar.
    IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
    m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (SVar t m a -> m ()
forall (t :: (* -> *) -> * -> *). SVar t m a -> m ()
work SVar t m a
sv) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv)
    SVar t m a -> m (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv

    where

    {-# NOINLINE work #-}
    work :: SVar t m a -> m ()
work SVar t m a
sv = m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a b
f (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromProducerD SVar t m a
sv

-------------------------------------------------------------------------------
-- Process events received by the producer thread from the consumer side
-------------------------------------------------------------------------------

-- XXX currently only one event is sent by a fold consumer to the stream
-- producer. But we can potentially have multiple events e.g. the fold step can
-- generate exception more than once and the producer can ignore those
-- exceptions or handle them and still keep driving the fold.
--
-- | Poll for events sent by the fold consumer to the stream pusher. The fold
-- consumer can send a "Stop" event or an exception. When a "Stop" is received
-- this function returns 'True'. If an exception is recieved then it throws the
-- exception.
--
{-# NOINLINE fromConsumer #-}
fromConsumer :: MonadAsync m => SVar K.Stream m a -> m Bool
fromConsumer :: SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv = do
    ([ChildEvent a]
list, Int
_) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv)
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    [ChildEvent a] -> m Bool
forall (m :: * -> *) a. MonadThrow m => [ChildEvent a] -> m Bool
processEvents ([ChildEvent a] -> m Bool) -> [ChildEvent a] -> m Bool
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> m Bool
processEvents [] = Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
        case ChildEvent a
ev of
            ChildStop ThreadId
_ Maybe SomeException
e -> do
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                    Just SomeException
ex -> SomeException -> m Bool
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
            ChildYield a
_ -> String -> m Bool
forall a. HasCallStack => String -> a
error String
"Bug: fromConsumer: invalid ChildYield event"

-- | Push values from a stream to a fold worker via an SVar. Before pushing a
-- value to the SVar it polls for events received from the fold consumer.  If a
-- stop event is received then it returns 'True' otherwise false.  Propagates
-- exceptions received from the fold consumer.
--
{-# INLINE pushToFold #-}
pushToFold :: MonadAsync m => SVar K.Stream m a -> a -> m Bool
pushToFold :: SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
sv a
a = do
    -- Check for exceptions before decrement so that we do not
    -- block forever if the child already exited with an exception.
    --
    -- We avoid a race between the consumer fold sending an event and we
    -- blocking on decrementBufferLimit by waking up the producer thread in
    -- sendToProducer before any event is sent by the fold to the producer
    -- stream.
    let qref :: IORef ([ChildEvent a], Int)
qref = SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv
    Bool
done <- do
        ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([ChildEvent a], Int)
qref
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv
        else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    if Bool
done
    then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    else IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
        SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
        IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

------------------------------------------------------------------------------
-- Clone and distribute a stream in parallel
------------------------------------------------------------------------------

-- XXX this could be written in StreamD style for better efficiency with fusion.
--
-- | Tap a stream and send the elements to the specified SVar in addition to
-- yielding them again. The SVar runs a fold consumer. Elements are tapped and
-- sent to the SVar until the fold finishes. Any exceptions from the fold
-- evaluation are propagated in the current thread.
--
-- @
--
-- ------input stream---------output stream----->
--                    /|\\   |
--         exceptions  |    |  input
--                     |   \\|/
--                     ----SVar
--                          |
--                         Fold
--
-- @
--
{-# INLINE teeToSVar #-}
teeToSVar :: MonadAsync m =>
    SVar K.Stream m a -> SerialT m a -> SerialT m a
teeToSVar :: SVar Stream m a -> SerialT m a -> SerialT m a
teeToSVar SVar Stream m a
svr (SerialT Stream m a
m) = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
    State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Bool -> Stream m a -> Stream m a
go Bool
False Stream m a
m)

    where

    go :: Bool -> Stream m a -> Stream m a
go Bool
False Stream m a
m0 = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
_ m r
stp -> do
        let drain :: m ()
drain = do
                -- In general, a Stop event would come equipped with the result
                -- of the fold. It is not used here but it would be useful in
                -- applicative and distribute.
                Bool
done <- SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
svr
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
                           (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
                    m ()
drain

            stopFold :: m ()
stopFold = do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
                -- drain/wait until a stop event arrives from the fold.
                m ()
drain

            stop :: m r
stop       = m ()
stopFold m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
            single :: a -> m r
single a
a   = do
                Bool
done <- SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a
                a -> Stream m a -> m r
yld a
a (Bool -> Stream m a -> Stream m a
go Bool
done (m () -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
K.nilM m ()
stopFold))
            yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a m Bool -> (Bool -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
done -> a -> Stream m a -> m r
yld a
a (Bool -> Stream m a -> Stream m a
go Bool
done Stream m a
r)
         in State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yieldk a -> m r
single m r
stop Stream m a
m0

    go Bool
True Stream m a
m0 = Stream m a
m0