module Streamly.Internal.Data.SVar
(
module Streamly.Internal.Data.SVar.Type
, module Streamly.Internal.Data.SVar.Worker
, module Streamly.Internal.Data.SVar.Dispatch
, module Streamly.Internal.Data.SVar.Pull
, getYieldRateInfo
, newSVarStats
, newParallelVar
, enqueueAhead
, reEnqueueAhead
, queueEmptyAhead
, dequeueAhead
, HeapDequeueResult(..)
, dequeueFromHeap
, dequeueFromHeapSeq
, requeueOnHeapTop
, updateHeapSeq
, withIORef
, heapIsSane
, newAheadVar
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent.MVar (newEmptyMVar, tryPutMVar, tryTakeMVar, newMVar)
import Control.Exception (assert)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (newIORef, readIORef)
import Data.IORef (IORef, atomicModifyIORef)
import Streamly.Internal.Control.Concurrent
(MonadAsync, askRunInIO, RunInIO)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..))
import qualified Data.Heap as H
import qualified Data.Set as S
import Streamly.Internal.Data.SVar.Dispatch
import Streamly.Internal.Data.SVar.Pull
import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Worker
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st = do
let rateToLatency :: a -> p
rateToLatency a
r = if a
r a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0 then p
forall a. Bounded a => a
maxBound else a -> p
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> p) -> a -> p
forall a b. (a -> b) -> a -> b
$ a
1.0e9 a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
r
case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Just (Rate Double
low Double
goal Double
high Int
buf) ->
let l :: NanoSecond64
l = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
goal
minl :: NanoSecond64
minl = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
high
maxl :: NanoSecond64
maxl = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
low
in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
Maybe Rate
Nothing -> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe YieldRateInfo
forall a. Maybe a
Nothing
where
mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
IORef NanoSecond64
measured <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
IORef (Count, Count, NanoSecond64)
wcur <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
wcol <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
wlong <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
IORef Count
period <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
1
IORef Count
gainLoss <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)
Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe YieldRateInfo -> IO (Maybe YieldRateInfo))
-> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Maybe YieldRateInfo
forall a. a -> Maybe a
Just YieldRateInfo :: NanoSecond64
-> LatencyRange
-> Int
-> IORef Count
-> IORef (Count, AbsTime)
-> Maybe NanoSecond64
-> IORef Count
-> IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IORef NanoSecond64
-> YieldRateInfo
YieldRateInfo
{ svarLatencyTarget :: NanoSecond64
svarLatencyTarget = NanoSecond64
latency
, svarLatencyRange :: LatencyRange
svarLatencyRange = LatencyRange
latRange
, svarRateBuffer :: Int
svarRateBuffer = Int
buf
, svarGainedLostYields :: IORef Count
svarGainedLostYields = IORef Count
gainLoss
, workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency State t m a
st
, workerPollingInterval :: IORef Count
workerPollingInterval = IORef Count
period
, workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency = IORef NanoSecond64
measured
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency = IORef (Count, Count, NanoSecond64)
wcur
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
, svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency = IORef (Count, AbsTime)
wlong
}
newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
IORef Int
disp <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWrk <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxOq <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxHs <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWq <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Count, NanoSecond64)
avgLat <- (Count, NanoSecond64) -> IO (IORef (Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
maxLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
minLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef (Maybe AbsTime)
stpTime <- Maybe AbsTime -> IO (IORef (Maybe AbsTime))
forall a. a -> IO (IORef a)
newIORef Maybe AbsTime
forall a. Maybe a
Nothing
SVarStats -> IO SVarStats
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats :: IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef (Count, NanoSecond64)
-> IORef NanoSecond64
-> IORef NanoSecond64
-> IORef (Maybe AbsTime)
-> SVarStats
SVarStats
{ totalDispatches :: IORef Int
totalDispatches = IORef Int
disp
, maxWorkers :: IORef Int
maxWorkers = IORef Int
maxWrk
, maxOutQSize :: IORef Int
maxOutQSize = IORef Int
maxOq
, maxHeapSize :: IORef Int
maxHeapSize = IORef Int
maxHs
, maxWorkQSize :: IORef Int
maxWorkQSize = IORef Int
maxWq
, avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
, minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
, maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
, svarStopTime :: IORef (Maybe AbsTime)
svarStopTime = IORef (Maybe AbsTime)
stpTime
}
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q (RunInIO m, t m a)
m = do
IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([(RunInIO m, t m a) -> t m a
forall a b. (a, b) -> b
snd (RunInIO m, t m a)
m], Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
([t m a], Int)
_ -> [Char] -> ([t m a], Int)
forall a. HasCallStack => [Char] -> a
error [Char]
"enqueueAhead: queue is not empty"
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([t m a
m], Int
n)
([t m a], Int)
_ -> [Char] -> ([t m a], Int)
forall a. HasCallStack => [Char] -> a
error [Char]
"reEnqueueAhead: queue is not empty"
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead :: IORef ([t m a], Int) -> m Bool
queueEmptyAhead IORef ([t m a], Int)
q = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
([t m a]
xs, Int
_) <- IORef ([t m a], Int) -> IO ([t m a], Int)
forall a. IORef a -> IO a
readIORef IORef ([t m a], Int)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [t m a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [t m a]
xs
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$
IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), Maybe (t m a, Int)
forall a. Maybe a
Nothing)
(t m a
x : [], Int
n) -> (([], Int
n), (t m a, Int) -> Maybe (t m a, Int)
forall a. a -> Maybe a
Just (t m a
x, Int
n))
([t m a], Int)
_ -> [Char] -> (([t m a], Int), Maybe (t m a, Int))
forall a. HasCallStack => [Char] -> a
error [Char]
"more than one item on queue"
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
IORef a -> (a -> (a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
Just Int
n -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Just Int
_ -> [Char]
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
case Maybe Int
snum of
Maybe Int
Nothing -> Bool
True
Just Int
n -> Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Entry Int (AheadHeapEntry t m a)
-> Heap (Entry Int (AheadHeapEntry t m a))
-> Heap (Entry Int (AheadHeapEntry t m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar :: State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH <- (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
forall a. a -> IO (IORef a)
newIORef (Heap (Entry Int (AheadHeapEntry t m a))
forall a. Heap a
H.empty, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
IORef ([t m a], Int)
q <- ([t m a], Int) -> IO (IORef ([t m a], Int))
forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
MVar ()
stopMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
readOutput SVar t m a -> m Bool
postProc = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State t m a
st) (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar t m a -> m [ChildEvent a]
readOutput SVar t m a
sv
, postProcess :: m Bool
postProcess = SVar t m a -> m Bool
postProc SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH State t m a
st{streamVar :: Maybe (SVar t m a)
streamVar = SVar t m a -> Maybe (SVar t m a)
forall a. a -> Maybe a
Just SVar t m a
sv} SVar t m a
sv
, enqueue :: (RunInIO m, t m a) -> IO ()
enqueue = SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q
, isWorkDone :: IO Bool
isWorkDone = SVar t m a
-> IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b
a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, isQueueDone :: IO Bool
isQueueDone = SVar t m a -> IORef ([t m a], Int) -> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef ([t m a], Int)
q
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AheadVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
stopMVar
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = IORef ([t m a], Int)
q
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar t m a
sv =
case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Maybe Rate
Nothing -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
Just Rate
_ -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
in SVar t m a -> IO (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead :: SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q = do
Bool
queueDone <- IORef (t a, b) -> IO Bool
forall (t :: * -> *) a b. Foldable t => IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
yref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
yref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead :: SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
Bool
heapDone <- do
(Heap a
hp, b
_) <- IORef (Heap a, b) -> IO (Heap a, b)
forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
hp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0)
Bool
queueDone <- SVar t m a -> IORef (t a, b) -> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone
checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
(t a
xs, b
_) <- IORef (t a, b) -> IO (t a, b)
forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ t a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar :: State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State t m a
st t m a
m IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar t m a
sv <- IO (SVar t m a) -> m (SVar t m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop RunInIO m
mrun
SVar t m a -> t m a -> m (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m
getParallelSVar :: MonadIO m
=> SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar :: SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef ([ChildEvent a], Int)
outQRev <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar ()
outQMvRev <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
let bufLim :: Count
bufLim =
case State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st of
Limit
Unlimited -> Count
forall a. HasCallStack => a
undefined
Limited Word
x -> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x
IORef Count
remBuf <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
bufLim
MVar ()
pbMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
IORef ThreadId
stopBy <-
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ThreadId) -> IO (IORef ThreadId))
-> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO (IORef ThreadId)
forall a. a -> IO (IORef a)
newIORef ThreadId
forall a. HasCallStack => a
undefined
SVarStopStyle
_ -> IORef ThreadId -> IO (IORef ThreadId)
forall (m :: * -> *) a. Monad m => a -> m a
return IORef ThreadId
forall a. HasCallStack => a
undefined
let sv :: SVar t m a
sv =
SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
outQRev
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
remBuf
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
PushBufferBlock
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
pbMVar
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit
Unlimited
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a.
MonadIO m =>
SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv
, postProcess :: m Bool
postProcess = SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = Maybe WorkerInfo -> m ()
forall a. HasCallStack => a
undefined
, enqueue :: (RunInIO m, t m a) -> IO ()
enqueue = (RunInIO m, t m a) -> IO ()
forall a. HasCallStack => a
undefined
, isWorkDone :: IO Bool
isWorkDone = IO Bool
forall a. HasCallStack => a
undefined
, isQueueDone :: IO Bool
isQueueDone = IO Bool
forall a. HasCallStack => a
undefined
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
forall a. HasCallStack => a
undefined
, svarStyle :: SVarStyle
svarStyle = SVarStyle
ParallelVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
ss
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
stopBy
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = IORef ([t m a], Int)
forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
in SVar t m a -> IO (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
forall (t :: (* -> *) -> * -> *). SVar t m a
sv
where
readOutputQPar :: SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv = IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ do
SVar t m a -> [Char] -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [Char] -> IO () -> IO ()
withDiagMVar SVar t m a
sv [Char]
"readOutputQPar: doorbell"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just YieldRateInfo
yinfo -> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Count, AbsTime, NanoSecond64) -> IO ())
-> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
[ChildEvent a]
r <- ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
[ChildEvent a] -> IO [ChildEvent a]
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
r
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
=> SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar :: SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State t m a
st = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO (SVar t m a) -> m (SVar t m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun