{-# LANGUAGE CPP #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UnboxedTuples #-}
module Streamly.SVar
(
MonadAsync
, SVarStyle (..)
, SVar (..)
, Limit (..)
, State (streamVar)
, defState
, adaptState
, getMaxThreads
, setMaxThreads
, getMaxBuffer
, setMaxBuffer
, getStreamRate
, setStreamRate
, setStreamLatency
, getYieldLimit
, setYieldLimit
, getInspectMode
, setInspectMode
, cleanupSVar
, cleanupSVarFromWorker
, newAheadVar
, newParallelVar
, captureMonadState
, RunInIO (..)
, atomicModifyIORefCAS
, WorkerInfo (..)
, YieldRateInfo (..)
, ThreadAbort (..)
, ChildEvent (..)
, AheadHeapEntry (..)
, send
, sendYield
, sendStop
, enqueueLIFO
, enqueueFIFO
, enqueueAhead
, reEnqueueAhead
, pushWorkerPar
, queueEmptyAhead
, dequeueAhead
, HeapDequeueResult(..)
, dequeueFromHeap
, dequeueFromHeapSeq
, requeueOnHeapTop
, updateHeapSeq
, withIORef
, heapIsSane
, Rate (..)
, getYieldRateInfo
, newSVarStats
, collectLatency
, workerUpdateLatency
, isBeyondMaxRate
, workerRateControl
, updateYieldCount
, decrementYieldLimit
, decrementYieldLimitPost
, incrementYieldLimit
, postProcessBounded
, postProcessPaced
, readOutputQBounded
, readOutputQPaced
, dispatchWorkerPaced
, sendFirstWorker
, delThread
, toStreamVar
, SVarStats (..)
, NanoSecs (..)
, dumpSVar
)
where
import Control.Concurrent
(ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar
(MVar, newEmptyMVar, tryPutMVar, takeMVar, newMVar, tryReadMVar)
import Control.Exception
(SomeException(..), catch, mask, assert, Exception, catches,
throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (MonadBaseControl, control, StM)
import Data.Atomics
(casIORef, readForCAS, peekTicket, atomicModifyIORefCAS_,
writeBarrier, storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.List ((\\))
import Data.Maybe (fromJust)
import Data.Semigroup ((<>))
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.Clock (TimeSpec, Clock(Monotonic), getTime, toNanoSecs)
import System.IO (hPutStrLn, stderr)
import Text.Printf (printf)
import qualified Data.Heap as H
import qualified Data.Set as S
newtype NanoSecs = NanoSecs Int64
deriving ( Eq
, Read
, Show
, Enum
, Bounded
, Num
, Real
, Integral
, Ord
)
newtype Count = Count Int64
deriving ( Eq
, Read
, Show
, Enum
, Bounded
, Num
, Real
, Integral
, Ord
)
data ThreadAbort = ThreadAbort deriving Show
instance Exception ThreadAbort
data ChildEvent a =
ChildYield a
| ChildStop ThreadId (Maybe SomeException)
data AheadHeapEntry (t :: (* -> *) -> * -> *) m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (t m a)
data SVarStyle =
AsyncVar
| WAsyncVar
| ParallelVar
| AheadVar
deriving (Eq, Show)
data WorkerInfo = WorkerInfo
{ workerYieldMax :: Count
, workerYieldCount :: IORef Count
, workerLatencyStart :: IORef (Count, TimeSpec)
}
data Rate = Rate
{ rateLow :: Double
, rateGoal :: Double
, rateHigh :: Double
, rateBuffer :: Int
}
data LatencyRange = LatencyRange
{ minLatency :: NanoSecs
, maxLatency :: NanoSecs
} deriving Show
data YieldRateInfo = YieldRateInfo
{ svarLatencyTarget :: NanoSecs
, svarLatencyRange :: LatencyRange
, svarRateBuffer :: Int
, svarGainedLostYields :: IORef Count
, svarAllTimeLatency :: IORef (Count, TimeSpec)
, workerBootstrapLatency :: Maybe NanoSecs
, workerPollingInterval :: IORef Count
, workerPendingLatency :: IORef (Count, NanoSecs)
, workerCollectedLatency :: IORef (Count, NanoSecs)
, workerMeasuredLatency :: IORef NanoSecs
}
data SVarStats = SVarStats {
totalDispatches :: IORef Int
, maxWorkers :: IORef Int
, maxOutQSize :: IORef Int
, maxHeapSize :: IORef Int
, maxWorkQSize :: IORef Int
, avgWorkerLatency :: IORef (Count, NanoSecs)
, minWorkerLatency :: IORef NanoSecs
, maxWorkerLatency :: IORef NanoSecs
, svarStopTime :: IORef (Maybe TimeSpec)
}
data Limit = Unlimited | Limited Word deriving Show
data SVar t m a = SVar
{
svarStyle :: SVarStyle
, svarMrun :: RunInIO m
, outputQueue :: IORef ([ChildEvent a], Int)
, outputDoorBell :: MVar ()
, readOutputQ :: m [ChildEvent a]
, postProcess :: m Bool
, maxWorkerLimit :: Limit
, maxBufferLimit :: Limit
, remainingWork :: Maybe (IORef Count)
, yieldRateInfo :: Maybe YieldRateInfo
, enqueue :: t m a -> IO ()
, isWorkDone :: IO Bool
, isQueueDone :: IO Bool
, needDoorBell :: IORef Bool
, workLoop :: Maybe WorkerInfo -> m ()
, workerThreads :: IORef (Set ThreadId)
, workerCount :: IORef Int
, accountThread :: ThreadId -> m ()
, workerStopMVar :: MVar ()
, svarStats :: SVarStats
, svarRef :: Maybe (IORef ())
, svarInspectMode :: Bool
, svarCreator :: ThreadId
, outputHeap :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
, Maybe Int)
, aheadWorkQueue :: IORef ([t m a], Int)
}
data State t m a = State
{
streamVar :: Maybe (SVar t m a)
, _yieldLimit :: Maybe Count
, _threadsHigh :: Limit
, _bufferHigh :: Limit
, _streamLatency :: Maybe NanoSecs
, _maxStreamRate :: Maybe Rate
, _inspectMode :: Bool
}
magicMaxBuffer :: Word
magicMaxBuffer = 1500
defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads = Limited magicMaxBuffer
defaultMaxBuffer = Limited magicMaxBuffer
defState :: State t m a
defState = State
{ streamVar = Nothing
, _yieldLimit = Nothing
, _threadsHigh = defaultMaxThreads
, _bufferHigh = defaultMaxBuffer
, _maxStreamRate = Nothing
, _streamLatency = Nothing
, _inspectMode = False
}
adaptState :: State t m a -> State t m b
adaptState st = st
{ streamVar = Nothing
, _yieldLimit = Nothing
}
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit lim st =
st { _yieldLimit =
case lim of
Nothing -> Nothing
Just n ->
if n <= 0
then Just 0
else Just (fromIntegral n)
}
getYieldLimit :: State t m a -> Maybe Count
getYieldLimit = _yieldLimit
setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads n st =
st { _threadsHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxThreads
else Limited (fromIntegral n)
}
getMaxThreads :: State t m a -> Limit
getMaxThreads = _threadsHigh
setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer n st =
st { _bufferHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxBuffer
else Limited (fromIntegral n)
}
getMaxBuffer :: State t m a -> Limit
getMaxBuffer = _bufferHigh
setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate r st = st { _maxStreamRate = r }
getStreamRate :: State t m a -> Maybe Rate
getStreamRate = _maxStreamRate
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency n st =
st { _streamLatency =
if n <= 0
then Nothing
else Just (fromIntegral n)
}
getStreamLatency :: State t m a -> Maybe NanoSecs
getStreamLatency = _streamLatency
setInspectMode :: State t m a -> State t m a
setInspectMode st = st { _inspectMode = True }
getInspectMode :: State t m a -> Bool
getInspectMode = _inspectMode
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar sv = do
workers <- readIORef (workerThreads sv)
Prelude.mapM_ (`throwTo` ThreadAbort)
(S.toList workers)
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker sv = do
workers <- readIORef (workerThreads sv)
self <- myThreadId
mapM_ (`throwTo` ThreadAbort)
(S.toList workers \\ [self])
secs :: Double -> String
secs k
| k < 0 = '-' : secs (-k)
| k >= 1 = k `with` "s"
| k >= 1e-3 = (k*1e3) `with` "ms"
#ifdef mingw32_HOST_OS
| k >= 1e-6 = (k*1e6) `with` "us"
#else
| k >= 1e-6 = (k*1e6) `with` "μs"
#endif
| k >= 1e-9 = (k*1e9) `with` "ns"
| k >= 1e-12 = (k*1e12) `with` "ps"
| k >= 1e-15 = (k*1e15) `with` "fs"
| k >= 1e-18 = (k*1e18) `with` "as"
| otherwise = printf "%g s" k
where with (t :: Double) (u :: String)
| t >= 1e9 = printf "%.4g %s" t u
| t >= 1e3 = printf "%.0f %s" t u
| t >= 1e2 = printf "%.1f %s" t u
| t >= 1e1 = printf "%.2f %s" t u
| otherwise = printf "%.3f %s" t u
drainLatency :: SVar t m a -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
drainLatency sv yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(count, time) <- atomicModifyIORefCAS cur $ \v -> ((0,0), v)
(colCount, colTime) <- readIORef col
(lcount, ltime) <- readIORef longTerm
prev <- readIORef measured
let pendingCount = colCount + count
pendingTime = colTime + time
lcount' = lcount + pendingCount
notUpdated = (lcount', ltime, prev)
if (pendingCount > 0)
then do
let new = pendingTime `div` (fromIntegral pendingCount)
when (svarInspectMode sv) $ do
let ss = svarStats sv
minLat <- readIORef (minWorkerLatency ss)
when (new < minLat || minLat == 0) $
writeIORef (minWorkerLatency ss) new
maxLat <- readIORef (maxWorkerLatency ss)
when (new > maxLat) $ writeIORef (maxWorkerLatency ss) new
modifyIORef (avgWorkerLatency ss) $
\(cnt, t) -> (cnt + pendingCount, t + pendingTime)
writeIORef col (0, 0)
writeIORef measured new
modifyIORef longTerm $ \(_, t) -> (lcount', t)
return (lcount', ltime, new)
else return notUpdated
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats sv ss style = do
case yieldRateInfo sv of
Nothing -> return ()
Just yinfo -> do
_ <- liftIO $ drainLatency sv yinfo
return ()
dispatches <- readIORef $ totalDispatches ss
maxWrk <- readIORef $ maxWorkers ss
maxOq <- readIORef $ maxOutQSize ss
maxHp <- readIORef $ maxHeapSize ss
minLat <- readIORef $ minWorkerLatency ss
maxLat <- readIORef $ maxWorkerLatency ss
(avgCnt, avgTime) <- readIORef $ avgWorkerLatency ss
(svarCnt, svarGainLossCnt, svarLat) <- case yieldRateInfo sv of
Nothing -> return (0, 0, 0)
Just yinfo -> do
(cnt, startTime) <- readIORef $ svarAllTimeLatency yinfo
if cnt > 0
then do
t <- readIORef (svarStopTime ss)
gl <- readIORef (svarGainedLostYields yinfo)
case t of
Nothing -> do
now <- getTime Monotonic
let interval = toNanoSecs (now - startTime)
return (cnt, gl, interval `div` fromIntegral cnt)
Just stopTime -> do
let interval = toNanoSecs (stopTime - startTime)
return (cnt, gl, interval `div` fromIntegral cnt)
else return (0, 0, 0)
return $ unlines
[ "total dispatches = " <> show dispatches
, "max workers = " <> show maxWrk
, "max outQSize = " <> show maxOq
<> (if style == AheadVar
then "\nheap max size = " <> show maxHp
else "")
<> (if minLat > 0
then "\nmin worker latency = "
<> secs (fromIntegral minLat * 1e-9)
else "")
<> (if maxLat > 0
then "\nmax worker latency = "
<> secs (fromIntegral maxLat * 1e-9)
else "")
<> (if avgCnt > 0
then let lat = avgTime `div` fromIntegral avgCnt
in "\navg worker latency = "
<> secs (fromIntegral lat * 1e-9)
else "")
<> (if svarLat > 0
then "\nSVar latency = "
<> secs (fromIntegral svarLat * 1e-9)
else "")
<> (if svarCnt > 0
then "\nSVar yield count = " <> show svarCnt
else "")
<> (if svarGainLossCnt > 0
then "\nSVar gain/loss yield count = " <> show svarGainLossCnt
else "")
]
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar sv = do
(oqList, oqLen) <- readIORef $ outputQueue sv
db <- tryReadMVar $ outputDoorBell sv
aheadDump <-
if svarStyle sv == AheadVar
then do
(oheap, oheapSeq) <- readIORef $ outputHeap sv
(wq, wqSeq) <- readIORef $ aheadWorkQueue sv
return $ unlines
[ "heap length = " <> show (H.size oheap)
, "heap seqeunce = " <> show oheapSeq
, "work queue length = " <> show (length wq)
, "work queue sequence = " <> show wqSeq
]
else return []
let style = svarStyle sv
waiting <-
if style /= ParallelVar
then readIORef $ needDoorBell sv
else return False
rthread <- readIORef $ workerThreads sv
workers <- readIORef $ workerCount sv
stats <- dumpSVarStats sv (svarStats sv) (svarStyle sv)
return $ unlines
[
"Creator tid = " <> show (svarCreator sv),
"style = " <> show (svarStyle sv)
, "---------CURRENT STATE-----------"
, "outputQueue length computed = " <> show (length oqList)
, "outputQueue length maintained = " <> show oqLen
, "outputDoorBell = " <> show db
]
<> aheadDump
<> unlines
[ "needDoorBell = " <> show waiting
, "running threads = " <> show rthread
, "running thread count = " <> show workers
]
<> "---------STATS-----------\n"
<> stats
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler sv label e@BlockedIndefinitelyOnMVar = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label <> " " <> "BlockedIndefinitelyOnMVar\n" <> svInfo
throwIO e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler sv label e@BlockedIndefinitelyOnSTM = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label <> " " <> "BlockedIndefinitelyOnSTM\n" <> svInfo
throwIO e
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar sv label action =
if svarInspectMode sv
then
action `catches` [ Handler (mvarExcHandler sv label)
, Handler (stmExcHandler sv label)
]
else action
{-# INLINE atomicModifyIORefCAS #-}
atomicModifyIORefCAS :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORefCAS ref fn = do
tkt <- readForCAS ref
loop tkt retries
where
retries = 25 :: Int
loop _ 0 = atomicModifyIORef ref fn
loop old tries = do
let (new, result) = fn $ peekTicket old
(success, tkt) <- casIORef ref old new
if success
then return result
else loop tkt (tries - 1)
{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell sv = do
storeLoadBarrier
w <- readIORef $ needDoorBell sv
when w $ do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO (StM m b) }
captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
captureMonadState = control $ \run -> run (return $ RunInIO run)
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case fork# action s of (# s1, tid #) -> (# s1, ThreadId tid #)
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
-> RunInIO m
-> (SomeException -> IO ())
-> m ThreadId
doFork action (RunInIO mrun) exHandler =
control $ \run ->
mask $ \restore -> do
tid <- rawForkIO $ catch (restore $ void $ mrun action)
exHandler
run (return tid)
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit sv =
case remainingWork sv of
Nothing -> return True
Just ref -> do
r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
return $ r >= 1
{-# INLINE decrementYieldLimitPost #-}
decrementYieldLimitPost :: SVar t m a -> IO Bool
decrementYieldLimitPost sv =
case remainingWork sv of
Nothing -> return True
Just ref -> do
r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
return $ r > 1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit sv =
case remainingWork sv of
Nothing -> return ()
Just ref -> atomicModifyIORefCAS_ ref (+ 1)
send :: SVar t m a -> ChildEvent a -> IO Bool
send sv msg = do
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
when (len <= 0) $ do
writeBarrier
void $ tryPutMVar (outputDoorBell sv) ()
let limit = maxBufferLimit sv
case limit of
Unlimited -> return True
Limited lim -> do
active <- readIORef (workerCount sv)
return $ len < (fromIntegral lim - active)
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecs))
workerCollectLatency winfo = do
(cnt0, t0) <- readIORef (workerLatencyStart winfo)
cnt1 <- readIORef (workerYieldCount winfo)
let cnt = cnt1 - cnt0
if cnt > 0
then do
t1 <- getTime Monotonic
let period = fromInteger $ toNanoSecs (t1 - t0)
writeIORef (workerLatencyStart winfo) (cnt1, t1)
return $ Just (cnt, period)
else return Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency yinfo winfo = do
r <- workerCollectLatency winfo
case r of
Just (cnt, period) -> do
let ref = workerPendingLatency yinfo
atomicModifyIORefCAS_ ref $ \(n, t) -> (n + cnt, t + period)
Nothing -> return ()
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount winfo = do
cnt <- readIORef (workerYieldCount winfo)
let cnt1 = cnt + 1
writeIORef (workerYieldCount winfo) cnt1
return cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield cnt winfo =
let ymax = workerYieldMax winfo
in ymax /= 0 && cnt >= ymax
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic sv yinfo winfo ycnt = do
i <- readIORef (workerPollingInterval yinfo)
if i /= 0 && (ycnt `mod` i) == 0
then do
workerUpdateLatency yinfo winfo
isBeyondMaxRate sv yinfo
else return False
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl sv yinfo winfo = do
cnt <- updateYieldCount winfo
beyondMaxRate <- checkRatePeriodic sv yinfo winfo cnt
return $ not (isBeyondMaxYield cnt winfo || beyondMaxRate)
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield sv mwinfo msg = do
r <- send sv msg
rateLimitOk <-
case mwinfo of
Just winfo ->
case yieldRateInfo sv of
Nothing -> return True
Just yinfo -> workerRateControl sv yinfo winfo
Nothing -> return True
return $ r && rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate winfo info = do
i <- readIORef (workerPollingInterval info)
when (i /= 0) $ workerUpdateLatency info winfo
{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop sv mwinfo = do
atomicModifyIORefCAS_ (workerCount sv) $ \n -> n - 1
case (mwinfo, yieldRateInfo sv) of
(Just winfo, Just info) ->
workerStopUpdate winfo info
_ ->
return ()
myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
{-# INLINE enqueueLIFO #-}
enqueueLIFO :: SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO sv q m = do
atomicModifyIORefCAS_ q $ \ms -> m : ms
ringDoorBell sv
{-# INLINE enqueueFIFO #-}
enqueueFIFO :: SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO sv q m = do
pushL q m
ringDoorBell sv
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
enqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n + 1)
_ -> error "not empty"
ringDoorBell sv
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n)
_ -> error "not empty"
ringDoorBell sv
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead q = liftIO $ do
(xs, _) <- readIORef q
return $ null xs
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead q = liftIO $
atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing)
(x : [], n) -> (([], n), Just (x, n))
_ -> error "more than one item on queue"
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef ref f = readIORef ref >>= f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ ref f =
atomicModifyIORef ref $ \x -> (f x, ())
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap hpVar =
atomicModifyIORef hpVar $ \pair@(hp, snum) ->
case snum of
Nothing -> (pair, Clearing)
Just n -> do
let r = H.uncons hp
case r of
Just (ent@(Entry seqNo _ev), hp') ->
if seqNo == n
then ((hp', Nothing), Ready ent)
else assert (seqNo >= n) (pair, Waiting n)
Nothing -> (pair, Waiting n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq hpVar i =
atomicModifyIORef hpVar $ \(hp, snum) ->
case snum of
Nothing -> do
let r = H.uncons hp
case r of
Just (ent@(Entry seqNo _ev), hp') ->
if seqNo == i
then ((hp', Nothing), Ready ent)
else assert (seqNo >= i) ((hp, Just i), Waiting i)
Nothing -> ((hp, Just i), Waiting i)
Just _ -> error "dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane snum seqNo =
case snum of
Nothing -> True
Just n -> seqNo >= n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop hpVar ent seqNo =
atomicModifyIORef_ hpVar $ \(hp, snum) ->
assert (heapIsSane snum seqNo) (H.insert ent hp, Just seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq hpVar seqNo =
atomicModifyIORef_ hpVar $ \(hp, snum) ->
assert (heapIsSane snum seqNo) (hp, Just seqNo)
{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread sv tid =
liftIO $ modifyIORef (workerThreads sv) (S.insert tid)
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread sv tid =
liftIO $ modifyIORef (workerThreads sv) (S.delete tid)
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread sv tid = do
changed <- liftIO $ atomicModifyIORefCAS (workerThreads sv) $ \old ->
if S.member tid old
then let new = S.delete tid old in (new, new)
else let new = S.insert tid old in (new, old)
when (null changed) $
liftIO $ do
writeBarrier
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone sv = liftIO $ S.null <$> readIORef (workerThreads sv)
{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException sv e = do
tid <- myThreadId
void $ send sv (ChildStop tid (Just e))
{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers sv = liftIO $ do
active <- readIORef (workerCount sv)
maxWrk <- readIORef (maxWorkers $ svarStats sv)
when (active > maxWrk) $ writeIORef (maxWorkers $ svarStats sv) active
modifyIORef (totalDispatches $ svarStats sv) (+1)
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker yieldMax sv = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
when (svarInspectMode sv) $ recordMaxWorkers sv
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just WorkerInfo
{ workerYieldMax = yieldMax
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
doFork (workLoop sv winfo) (svarMrun sv) (handleChildException sv)
>>= addThread sv
{-# INLINE pushWorkerPar #-}
pushWorkerPar
:: MonadAsync m
=> SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar sv wloop =
if svarInspectMode sv
then forkWithDiag
else doFork (wloop Nothing) (svarMrun sv) (handleChildException sv)
>>= modifyThread sv
where
{-# NOINLINE forkWithDiag #-}
forkWithDiag = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
recordMaxWorkers sv
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just WorkerInfo
{ workerYieldMax = 0
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
doFork (wloop winfo) (svarMrun sv) (handleChildException sv)
>>= modifyThread sv
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker yieldCount sv = do
let workerLimit = maxWorkerLimit sv
done <- liftIO $ isWorkDone sv
if not done
then do
qDone <- liftIO $ isQueueDone sv
active <- liftIO $ readIORef $ workerCount sv
if not qDone
then do
limit <- case remainingWork sv of
Nothing -> return workerLimit
Just ref -> do
n <- liftIO $ readIORef ref
return $
case workerLimit of
Unlimited -> Limited (fromIntegral n)
Limited lim -> Limited $ min lim (fromIntegral n)
let dispatch = pushWorker yieldCount sv >> return True
in case limit of
Unlimited -> dispatch
Limited lim | lim > 0 -> dispatch
_ -> return False
else do
when (active <= 0) $ pushWorker 0 sv
return False
else return False
minThreadDelay :: NanoSecs
minThreadDelay = 10^(6 :: Int)
rateRecoveryTime :: NanoSecs
rateRecoveryTime = 1000000
nanoToMicroSecs :: NanoSecs -> Int
nanoToMicroSecs s = fromIntegral s `div` 1000
data Work
= BlockWait NanoSecs
| PartialWorker Count
| ManyWorkers Int Count
deriving Show
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecs
-> NanoSecs
-> NanoSecs
-> LatencyRange
-> Work
estimateWorkers workerLimit svarYields gainLossYields
svarElapsed wLatency targetLat range =
let
targetYields = (svarElapsed + wLatency + targetLat - 1) `div` targetLat
effectiveYields = svarYields + gainLossYields
deltaYields = fromIntegral targetYields - effectiveYields
in if deltaYields > 0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq =
fromIntegral deltaYields /
fromIntegral rateRecoveryTime
yieldsFreq = 1.0 / fromIntegral targetLat
totalYieldsFreq = yieldsFreq + deltaYieldsFreq
requiredLat = NanoSecs $ round $ 1.0 / totalYieldsFreq
adjustedLat = min (max requiredLat (minLatency range))
(maxLatency range)
in assert (adjustedLat > 0) $
if wLatency <= adjustedLat
then PartialWorker deltaYields
else let workers = withLimit $ wLatency `div` adjustedLat
limited = min workers (fromIntegral deltaYields)
in ManyWorkers (fromIntegral limited) deltaYields
else
let expectedDuration = fromIntegral effectiveYields * targetLat
sleepTime = expectedDuration - svarElapsed
maxSleepTime = maxLatency range - wLatency
s = min sleepTime maxSleepTime
in assert (sleepTime >= 0) $
if s > 0 then BlockWait s else ManyWorkers 1 (Count 0)
where
withLimit n =
case workerLimit of
Unlimited -> n
Limited x -> min n (fromIntegral x)
getWorkerLatency :: YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
getWorkerLatency yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(count, time) <- readIORef cur
(colCount, colTime) <- readIORef col
(lcount, ltime) <- readIORef longTerm
prev <- readIORef measured
let pendingCount = colCount + count
pendingTime = colTime + time
new =
if pendingCount > 0
then let lat = pendingTime `div` fromIntegral pendingCount
in (lat + prev) `div` 2
else prev
return (lcount + pendingCount, ltime, new)
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate sv yinfo = do
(count, tstamp, wLatency) <- getWorkerLatency yinfo
now <- getTime Monotonic
let duration = fromInteger $ toNanoSecs $ now - tstamp
let targetLat = svarLatencyTarget yinfo
gainLoss <- readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers (maxWorkerLimit sv) count gainLoss duration
wLatency targetLat (svarLatencyRange yinfo)
cnt <- readIORef $ workerCount sv
return $ case work of
PartialWorker _yields -> cnt > 1
ManyWorkers n _ -> cnt > n
BlockWait _ -> True
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecs -> IO ()
updateWorkerPollingInterval yinfo latency = do
let periodRef = workerPollingInterval yinfo
cnt = max 1 $ minThreadDelay `div` latency
period = min cnt (fromIntegral magicMaxBuffer)
writeIORef periodRef (fromIntegral period)
collectLatency :: SVar t m a -> YieldRateInfo -> IO (Count, TimeSpec, NanoSecs)
collectLatency sv yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(count, time) <- atomicModifyIORefCAS cur $ \v -> ((0,0), v)
(colCount, colTime) <- readIORef col
(lcount, ltime) <- readIORef longTerm
prev <- readIORef measured
let pendingCount = colCount + count
pendingTime = colTime + time
lcount' = lcount + pendingCount
tripleWith lat = (lcount', ltime, lat)
if pendingCount > 0
then do
let new = pendingTime `div` (fromIntegral pendingCount)
when (svarInspectMode sv) $ do
let ss = svarStats sv
minLat <- readIORef (minWorkerLatency ss)
when (new < minLat || minLat == 0) $
writeIORef (minWorkerLatency ss) new
maxLat <- readIORef (maxWorkerLatency ss)
when (new > maxLat) $ writeIORef (maxWorkerLatency ss) new
if (pendingCount > fromIntegral magicMaxBuffer)
|| (pendingTime > minThreadDelay)
|| (let r = fromIntegral new / fromIntegral prev :: Double
in prev > 0 && (r > 2 || r < 0.5))
|| (prev == 0)
then do
when (svarInspectMode sv) $ do
let ss = svarStats sv
modifyIORef (avgWorkerLatency ss) $
\(cnt, t) -> (cnt + pendingCount, t + pendingTime)
updateWorkerPollingInterval yinfo (max new prev)
writeIORef col (0, 0)
writeIORef measured ((prev + new) `div` 2)
modifyIORef longTerm $ \(_, t) -> (lcount', t)
return $ tripleWith new
else do
writeIORef col (pendingCount, pendingTime)
return $ tripleWith prev
else return $ tripleWith prev
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced sv = do
let yinfo = fromJust $ yieldRateInfo sv
(svarYields, svarElapsed, wLatency) <- do
now <- liftIO $ getTime Monotonic
(yieldCount, baseTime, lat) <-
liftIO $ collectLatency sv yinfo
let elapsed = fromInteger $ toNanoSecs $ now - baseTime
let latency =
if lat == 0
then
case workerBootstrapLatency yinfo of
Nothing -> lat
Just t -> t
else lat
return (yieldCount, elapsed, latency)
if wLatency == 0
then return False
else do
let workerLimit = maxWorkerLimit sv
let targetLat = svarLatencyTarget yinfo
let range = svarLatencyRange yinfo
gainLoss <- liftIO $ readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers workerLimit svarYields gainLoss svarElapsed
wLatency targetLat range
case work of
BlockWait s -> do
assert (s >= 0) (return ())
done <- allThreadsDone sv
when done $ void $ do
liftIO $ threadDelay $ nanoToMicroSecs s
dispatchWorker 1 sv
return False
PartialWorker yields -> do
assert (yields > 0) (return ())
updateGainedLostYields yinfo yields
done <- allThreadsDone sv
when done $ void $ dispatchWorker yields sv
return False
ManyWorkers netWorkers yields -> do
assert (netWorkers >= 1) (return ())
assert (yields >= 0) (return ())
updateGainedLostYields yinfo yields
let periodRef = workerPollingInterval yinfo
ycnt = max 1 $ yields `div` fromIntegral netWorkers
period = min ycnt (fromIntegral magicMaxBuffer)
old <- liftIO $ readIORef periodRef
when (period < old) $
liftIO $ writeIORef periodRef period
cnt <- liftIO $ readIORef $ workerCount sv
if cnt < netWorkers
then do
let total = netWorkers - cnt
batch = max 1 $ fromIntegral $
minThreadDelay `div` targetLat
dispatchN (min total batch)
else return False
where
updateGainedLostYields yinfo yields = do
let buf = fromIntegral $ svarRateBuffer yinfo
when (yields /= 0 && abs yields > buf) $ do
let delta =
if yields > 0
then yields - buf
else yields + buf
liftIO $ modifyIORef (svarGainedLostYields yinfo) (+ delta)
dispatchN n =
if n == 0
then return True
else do
r <- dispatchWorker 0 sv
if r
then dispatchN (n - 1)
else return False
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced _ = return ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay _sv =
return ()
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadAsync m
=> (SVar t m a -> IO ())
-> (SVar t m a -> m Bool)
-> SVar t m a
-> m ()
sendWorkerWait delay dispatch sv = do
liftIO $ delay sv
(_, n) <- liftIO $ readIORef (outputQueue sv)
when (n <= 0) $ do
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
liftIO storeLoadBarrier
canDoMore <- dispatch sv
if canDoMore
then sendWorkerWait delay dispatch sv
else do
liftIO $ withDiagMVar sv "sendWorkerWait: nothing to do"
$ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)
when (len <= 0) $ sendWorkerWait delay dispatch sv
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw sv = do
(list, len) <- atomicModifyIORefCAS (outputQueue sv) $ \x -> (([],0), x)
when (svarInspectMode sv) $ do
let ref = maxOutQSize $ svarStats sv
oqLen <- readIORef ref
when (len > oqLen) $ writeIORef ref len
return (list, len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded sv = do
(list, len) <- liftIO $ readOutputQRaw sv
if len <= 0
then blockingRead
else do
sendOneWorker
return list
where
sendOneWorker = do
cnt <- liftIO $ readIORef $ workerCount sv
when (cnt <= 0) $ do
done <- liftIO $ isWorkDone sv
when (not done) (pushWorker 0 sv)
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelay (dispatchWorker 0) sv
liftIO (fst `fmap` readOutputQRaw sv)
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced sv = do
(list, len) <- liftIO $ readOutputQRaw sv
if len <= 0
then blockingRead
else do
void $ dispatchWorkerPaced sv
return list
where
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelayPaced dispatchWorkerPaced sv
liftIO (fst `fmap` readOutputQRaw sv)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded sv = do
workersDone <- allThreadsDone sv
if workersDone
then do
r <- liftIO $ isWorkDone sv
when (not r) (pushWorker 0 sv)
return r
else return False
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced sv = do
workersDone <- allThreadsDone sv
if workersDone
then do
r <- liftIO $ isWorkDone sv
when (not r) $ do
void $ dispatchWorkerPaced sv
noWorker <- allThreadsDone sv
when noWorker $ pushWorker 0 sv
return r
else return False
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo st = do
let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r
case getStreamRate st of
Just (Rate low goal high buf) ->
let l = rateToLatency goal
minl = rateToLatency high
maxl = rateToLatency low
in mkYieldRateInfo l (LatencyRange minl maxl) buf
Nothing -> return Nothing
where
mkYieldRateInfo latency latRange buf = do
measured <- newIORef 0
wcur <- newIORef (0,0)
wcol <- newIORef (0,0)
now <- getTime Monotonic
wlong <- newIORef (0,now)
period <- newIORef 1
gainLoss <- newIORef (Count 0)
return $ Just YieldRateInfo
{ svarLatencyTarget = latency
, svarLatencyRange = latRange
, svarRateBuffer = buf
, svarGainedLostYields = gainLoss
, workerBootstrapLatency = getStreamLatency st
, workerPollingInterval = period
, workerMeasuredLatency = measured
, workerPendingLatency = wcur
, workerCollectedLatency = wcol
, svarAllTimeLatency = wlong
}
newSVarStats :: IO SVarStats
newSVarStats = do
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
avgLat <- newIORef (0, NanoSecs 0)
maxLat <- newIORef (NanoSecs 0)
minLat <- newIORef (NanoSecs 0)
stpTime <- newIORef Nothing
return SVarStats
{ totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
, avgWorkerLatency = avgLat
, minWorkerLatency = minLat
, maxWorkerLatency = maxLat
, svarStopTime = stpTime
}
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar st f mrun = do
outQ <- newIORef ([], 0)
outH <- newIORef (H.empty, Just 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef ([], -1)
stopMVar <- newMVar ()
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
stats <- newSVarStats
tid <- myThreadId
let getSVar sv readOutput postProc = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxWorkerLimit = getMaxThreads st
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutput sv
, postProcess = postProc sv
, workerThreads = running
, workLoop = f q outH st{streamVar = Just sv} sv
, enqueue = enqueueAhead sv q
, isWorkDone = isWorkDoneAhead sv q outH
, isQueueDone = isQueueDoneAhead sv q
, needDoorBell = wfw
, svarStyle = AheadVar
, svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = stopMVar
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = q
, outputHeap = outH
, svarStats = stats
}
let sv =
case getStreamRate st of
Nothing -> getSVar sv readOutputQBounded postProcessBounded
Just _ -> getSVar sv readOutputQPaced postProcessPaced
in return sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead sv q = do
queueDone <- checkEmpty q
yieldsDone <-
case remainingWork sv of
Just yref -> do
n <- readIORef yref
return (n <= 0)
Nothing -> return False
return $ yieldsDone || queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead sv q ref = do
heapDone <- do
(hp, _) <- readIORef ref
return (H.size hp <= 0)
queueDone <- isQueueDoneAhead sv q
return $ heapDone && queueDone
checkEmpty q = do
(xs, _) <- readIORef q
return $ null xs
getParallelSVar :: MonadIO m => State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
running <- newIORef S.empty
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
stats <- newSVarStats
tid <- myThreadId
let sv =
SVar { outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = Unlimited
, maxWorkerLimit = Unlimited
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutputQPar sv
, postProcess = allThreadsDone sv
, workerThreads = running
, workLoop = undefined
, enqueue = undefined
, isWorkDone = undefined
, isQueueDone = undefined
, needDoorBell = undefined
, svarStyle = ParallelVar
, svarMrun = mrun
, workerCount = active
, accountThread = modifyThread sv
, workerStopMVar = undefined
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
, svarStats = stats
}
in return sv
where
readOutputQPar sv = liftIO $ do
withDiagMVar sv "readOutputQPar: doorbell"
$ takeMVar (outputDoorBell sv)
case yieldRateInfo sv of
Nothing -> return ()
Just yinfo -> void $ collectLatency sv yinfo
fst `fmap` readOutputQRaw sv
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker sv m = do
liftIO $ enqueue sv m
case yieldRateInfo sv of
Nothing -> pushWorker 0 sv
Just yinfo ->
if svarLatencyTarget yinfo == maxBound
then liftIO $ threadDelay maxBound
else pushWorker 1 sv
return sv
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar st m wloop = do
mrun <- captureMonadState
sv <- liftIO $ getAheadSVar st wloop mrun
sendFirstWorker sv m
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m => State t m a -> m (SVar t m a)
newParallelVar st = do
mrun <- captureMonadState
liftIO $ getParallelSVar st mrun
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
toStreamVar sv m = do
liftIO $ enqueue sv m
done <- allThreadsDone sv
when done $
case yieldRateInfo sv of
Nothing -> pushWorker 0 sv
Just _ -> pushWorker 1 sv