module Streamly.Internal.Data.SVar.Dispatch
(
collectLatency
, withDiagMVar
, dumpSVar
, printSVar
, delThread
, modifyThread
, allThreadsDone
, recordMaxWorkers
, pushWorker
, pushWorkerPar
, dispatchWorker
, dispatchWorkerPaced
, sendWorkerWait
, sendFirstWorker
, sendWorkerDelay
, sendWorkerDelayPaced
)
where
#include "inline.hs"
import Control.Concurrent (takeMVar, tryReadMVar, ThreadId, threadDelay)
import Control.Concurrent.MVar (tryPutMVar)
import Control.Exception
(assert, catches, throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Maybe (fromJust, fromMaybe)
import Data.IORef (IORef, modifyIORef, newIORef, readIORef, writeIORef)
#if __GLASGOW_HASKELL__ < 804
import Data.Semigroup ((<>))
#endif
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)
import System.IO (hPutStrLn, stderr)
import qualified Data.Heap as H
import qualified Data.Set as S
import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Worker
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo NanoSecond64
latency = do
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
cnt :: NanoSecond64
cnt = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
1 (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
latency
period :: NanoSecond64
period = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
cnt (Word -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef (NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
period)
{-# INLINE recordMinMaxLatency #-}
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
new = do
let ss :: SVarStats
ss = SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
< NanoSecond64
minLat Bool -> Bool -> Bool
|| NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss) NanoSecond64
new
NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
maxLat) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss) NanoSecond64
new
recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time) = do
let ss :: SVarStats
ss = SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
IORef (Count, NanoSecond64)
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss) (((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ())
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a b. (a -> b) -> a -> b
$
\(Count
cnt, NanoSecond64
t) -> (Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time)
{-# INLINE collectWorkerPendingLatency #-}
collectWorkerPendingLatency
:: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency :: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col = do
(Count
fcount, Count
count, NanoSecond64
time) <- IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Count, Count, NanoSecond64)
cur (((Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64))
-> ((Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ \(Count, Count, NanoSecond64)
v -> ((Count
0,Count
0,NanoSecond64
0), (Count, Count, NanoSecond64)
v)
(Count
fcnt, Count
cnt, NanoSecond64
t) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
let totalCount :: Count
totalCount = Count
fcnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
fcount
latCount :: Count
latCount = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count
latTime :: NanoSecond64
latTime = NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time
IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
totalCount, Count
latCount, NanoSecond64
latTime)
Bool -> IO () -> IO ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
latCount Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0 Bool -> Bool -> Bool
|| NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
/= NanoSecond64
0) (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
let latPair :: Maybe (Count, NanoSecond64)
latPair =
if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
latCount, NanoSecond64
latTime)
else Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing
(Count, Maybe (Count, NanoSecond64))
-> IO (Count, Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
totalCount, Maybe (Count, NanoSecond64)
latPair)
{-# INLINE shouldUseCollectedBatch #-}
shouldUseCollectedBatch
:: Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> Bool
shouldUseCollectedBatch :: Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
collectedYields NanoSecond64
collectedTime NanoSecond64
newLat NanoSecond64
prevLat =
let r :: Double
r = NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
newLat Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
prevLat :: Double
in (Count
collectedYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Bool -> Bool -> Bool
|| (NanoSecond64
collectedTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
minThreadDelay)
Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 Bool -> Bool -> Bool
&& (Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
2 Bool -> Bool -> Bool
|| Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0.5))
Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0)
collectLatency :: SVar t m a
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency :: SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
drain = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
newCount, Maybe (Count, NanoSecond64)
newLatPair) <- IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let newLcount :: Count
newLcount = Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
newCount
retWith :: c -> m (Count, AbsTime, c)
retWith c
lat = (Count, AbsTime, c) -> m (Count, AbsTime, c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
newLcount, AbsTime
ltime, c
lat)
case Maybe (Count, NanoSecond64)
newLatPair of
Maybe (Count, NanoSecond64)
Nothing -> NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
Just (Count
count, NanoSecond64
time) -> do
let newLat :: NanoSecond64
newLat = NanoSecond64
time NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
count
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> NanoSecond64 -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
newLat
if Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
newCount NanoSecond64
time NanoSecond64
newLat NanoSecond64
prevLat Bool -> Bool -> Bool
|| Bool
drain
then do
YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
newLat NanoSecond64
prevLat)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (Count, NanoSecond64) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time)
IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
0, Count
0, NanoSecond64
0)
IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef NanoSecond64
measured ((NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
newLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2)
IORef (Count, AbsTime)
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Count, AbsTime)
longTerm (((Count, AbsTime) -> (Count, AbsTime)) -> IO ())
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Count
_, AbsTime
t) -> (Count
newLcount, AbsTime
t)
NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
newLat
else NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv SVarStats
ss SVarStyle
style = do
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just YieldRateInfo
yinfo -> do
(Count, AbsTime, NanoSecond64)
_ <- IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
True
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Int
dispatches <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
totalDispatches SVarStats
ss
Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxWorkers SVarStats
ss
Int
maxOq <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxOutQSize SVarStats
ss
Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxHeapSize SVarStats
ss
NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss
NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss
(Count
avgCnt, NanoSecond64
avgTime) <- IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef (IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64))
-> IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss
(Count
svarCnt, Count
svarGainLossCnt, RelTime64
svarLat) <- case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
Just YieldRateInfo
yinfo -> do
(Count
cnt, AbsTime
startTime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (IORef (Count, AbsTime) -> IO (Count, AbsTime))
-> IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then do
Maybe AbsTime
t <- IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime SVarStats
ss)
Count
gl <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
case Maybe AbsTime
t of
Maybe AbsTime
Nothing -> do
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
startTime
(Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval RelTime64 -> RelTime64 -> RelTime64
forall a. Integral a => a -> a -> a
`div` Count -> RelTime64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
Just AbsTime
stopTime -> do
let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
stopTime AbsTime
startTime
(Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval RelTime64 -> RelTime64 -> RelTime64
forall a. Integral a => a -> a -> a
`div` Count -> RelTime64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
else (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"total dispatches = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
dispatches
, String
"max workers = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxWrk
, String
"max outQSize = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxOq
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (if SVarStyle
style SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
then String
"\nheap max size = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxHp
else String
"")
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then String
"\nmin worker latency = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
minLat
else String
"")
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
maxLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then String
"\nmax worker latency = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
maxLat
else String
"")
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (if Count
avgCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then let lat :: NanoSecond64
lat = NanoSecond64
avgTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
avgCnt
in String
"\navg worker latency = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
lat
else String
"")
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (if RelTime64
svarLat RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
> RelTime64
0
then String
"\nSVar latency = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> RelTime64 -> String
showRelTime64 RelTime64
svarLat
else String
"")
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (if Count
svarCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then String
"\nSVar yield count = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Count -> String
forall a. Show a => a -> String
show Count
svarCnt
else String
"")
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (if Count
svarGainLossCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then String
"\nSVar gain/loss yield count = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Count -> String
forall a. Show a => a -> String
show Count
svarGainLossCnt
else String
"")
]
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar :: SVar t m a -> IO String
dumpSVar SVar t m a
sv = do
([ChildEvent a]
oqList, Int
oqLen) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int))
-> IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv
Maybe ()
db <- MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryReadMVar (MVar () -> IO (Maybe ())) -> MVar () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv
String
aheadDump <-
if SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
then do
(Heap (Entry Int (AheadHeapEntry t m a))
oheap, Maybe Int
oheapSeq) <- IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. IORef a -> IO a
readIORef (IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap SVar t m a
sv
([t m a]
wq, Int
wqSeq) <- IORef ([t m a], Int) -> IO ([t m a], Int)
forall a. IORef a -> IO a
readIORef (IORef ([t m a], Int) -> IO ([t m a], Int))
-> IORef ([t m a], Int) -> IO ([t m a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef ([t m a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue SVar t m a
sv
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"heap length = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (Heap (Entry Int (AheadHeapEntry t m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry t m a))
oheap)
, String
"heap seqeunce = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Maybe Int -> String
forall a. Show a => a -> String
show Maybe Int
oheapSeq
, String
"work queue length = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([t m a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [t m a]
wq)
, String
"work queue sequence = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
wqSeq
]
else String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return []
let style :: SVarStyle
style = SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv
Bool
waiting <-
if SVarStyle
style SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
/= SVarStyle
ParallelVar
then IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Set ThreadId
rthread <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (IORef (Set ThreadId) -> IO (Set ThreadId))
-> IORef (Set ThreadId) -> IO (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv
Int
workers <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
String
stats <- SVar t m a -> SVarStats -> SVarStyle -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[
String
"Creator tid = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ThreadId -> String
forall a. Show a => a -> String
show (SVar t m a -> ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator SVar t m a
sv),
String
"style = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SVarStyle -> String
forall a. Show a => a -> String
show (SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
, String
"---------CURRENT STATE-----------"
, String
"outputQueue length computed = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([ChildEvent a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ChildEvent a]
oqList)
, String
"outputQueue length maintained = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
oqLen
, String
"outputDoorBell = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Maybe () -> String
forall a. Show a => a -> String
show Maybe ()
db
]
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
aheadDump
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> [String] -> String
unlines
[ String
"needDoorBell = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Bool -> String
forall a. Show a => a -> String
show Bool
waiting
, String
"running threads = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Set ThreadId -> String
forall a. Show a => a -> String
show Set ThreadId
rthread
, String
"running thread count = " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
workers
]
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"---------STATS-----------\n"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
stats
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
svInfo
BlockedIndefinitelyOnMVar -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
svInfo
BlockedIndefinitelyOnSTM -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
label IO ()
action =
if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then
IO ()
action IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` [ (BlockedIndefinitelyOnMVar -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label)
, (BlockedIndefinitelyOnSTM -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label)
]
else IO ()
action
printSVar :: SVar t m a -> String -> IO ()
printSVar :: SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
how = do
String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"\n" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
how String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"\n" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
svInfo
{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread :: SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv ThreadId
tid =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid)
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread :: SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv ThreadId
tid =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid)
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread :: SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid = do
Set ThreadId
changed <- IO (Set ThreadId) -> m (Set ThreadId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Set ThreadId) -> m (Set ThreadId))
-> IO (Set ThreadId) -> m (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId)
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) ((Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId))
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ \Set ThreadId
old ->
if ThreadId -> Set ThreadId -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member ThreadId
tid Set ThreadId
old
then let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
new)
else let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
old)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set ThreadId -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Set ThreadId
changed) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone :: SVar t m a -> m Bool
allThreadsDone SVar t m a
sv = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Set ThreadId -> Bool
forall a. Set a -> Bool
S.null (Set ThreadId -> Bool) -> IO (Set ThreadId) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers :: SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxWorkers (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxWrk) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxWorkers (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) Int
active
IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef Int
totalDispatches (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker :: Count -> SVar t m a -> m ()
pushWorker Count
yieldMax SVar t m a
sv = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
yieldMax
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (SVar t m a -> Maybe WorkerInfo -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop SVar t m a
sv Maybe WorkerInfo
winfo) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv
{-# INLINE pushWorkerPar #-}
pushWorkerPar
:: MonadAsync m
=> SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar :: SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar t m a
sv Maybe WorkerInfo -> m ()
wloop =
if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then m ()
forkWithDiag
else m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
forall a. Maybe a
Nothing) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
where
{-# NOINLINE forkWithDiag #-}
forkWithDiag :: m ()
forkWithDiag = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
0
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
winfo) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker :: Count -> SVar t m a -> m Bool
dispatchWorker Count
yieldCount SVar t m a
sv = do
let workerLimit :: Limit
workerLimit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
Bool
done <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
if Bool -> Bool
not Bool
done
then do
Bool
qDone <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone SVar t m a
sv
Int
active <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
if Bool -> Bool
not Bool
qDone
then do
Limit
limit <- case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Just IORef Count
ref -> do
Count
n <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Just YieldRateInfo
_ -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Maybe YieldRateInfo
Nothing ->
Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return (Limit -> m Limit) -> Limit -> m Limit
forall a b. (a -> b) -> a -> b
$
case Limit
workerLimit of
Limit
Unlimited -> Word -> Limit
Limited (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$ Word -> Word -> Word
forall a. Ord a => a -> a -> a
min Word
lim (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
let dispatch :: m Bool
dispatch = Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
yieldCount SVar t m a
sv m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
in case Limit
limit of
Limit
Unlimited -> m Bool
dispatch
Limited Word
lim | Word
lim Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
active -> m Bool
dispatch
Limit
_ -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced :: SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv = do
let yinfo :: YieldRateInfo
yinfo = Maybe YieldRateInfo -> YieldRateInfo
forall a. (?callStack::CallStack) => Maybe a -> a
fromJust (Maybe YieldRateInfo -> YieldRateInfo)
-> Maybe YieldRateInfo -> YieldRateInfo
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv
(Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
AbsTime
now <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
(Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
let elapsed :: NanoSecond64
elapsed = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
let latency :: NanoSecond64
latency =
if NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then NanoSecond64 -> Maybe NanoSecond64 -> NanoSecond64
forall a. a -> Maybe a -> a
fromMaybe NanoSecond64
lat (YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo)
else NanoSecond64
lat
(Count, NanoSecond64, NanoSecond64)
-> m (Count, NanoSecond64, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)
if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
let workerLimit :: Limit
workerLimit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
Count
gainLoss <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range
case Work
work of
BlockWait NanoSecond64
s -> do
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
let us :: MicroSecond64
us = RelTime64 -> MicroSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (MicroSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
1 SVar t m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
PartialWorker Count
yields -> do
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
YieldRateInfo -> Count -> m ()
forall (f :: * -> *). MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
yields SVar t m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ManyWorkers Int
netWorkers Count
yields -> do
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
netWorkers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
YieldRateInfo -> Count -> m ()
forall (f :: * -> *). MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
ycnt :: Count
ycnt = Count -> Count -> Count
forall a. Ord a => a -> a -> a
max Count
1 (Count -> Count) -> Count -> Count
forall a b. (a -> b) -> a -> b
$ Count
yields Count -> Count -> Count
forall a. Integral a => a -> a -> a
`div` Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
period :: Count
period = Count -> Count -> Count
forall a. Ord a => a -> a -> a
min Count
ycnt (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Count
old <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
periodRef
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
< Count
old) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period
Int
cnt <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
netWorkers
then do
let total :: Int
total = Int
netWorkers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cnt
batch :: Int
batch = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NanoSecond64 -> Int) -> NanoSecond64 -> Int
forall a b. (a -> b) -> a -> b
$
NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
Int -> m Bool
forall t. (Eq t, Num t) => t -> m Bool
dispatchN (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
total Int
batch)
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
let buf :: Count
buf = Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Count) -> Int -> Count
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count -> Count
forall a. Num a => a -> a
abs Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
buf) (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ do
let delta :: Count
delta =
if Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
buf
else Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
buf
IO () -> f ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> f ()) -> IO () -> f ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
delta)
dispatchN :: t -> m Bool
dispatchN t
n =
if t
n t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
0
then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
Bool
r <- Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0 SVar t m a
sv
if Bool
r
then t -> m Bool
dispatchN (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadAsync m
=> (SVar t m a -> IO ())
-> (SVar t m a -> m Bool)
-> SVar t m a
-> m ()
sendWorkerWait :: (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ()
delay SVar t m a
sv
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> (Bool -> Bool) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) ((Bool -> Bool) -> IO ()) -> (Bool -> Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
True
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
Bool
canDoMore <- SVar t m a -> m Bool
dispatch SVar t m a
sv
if Bool
canDoMore
then (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"sendWorkerWait: nothing to do"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
([ChildEvent a]
_, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker :: SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m = do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar t m a
sv (RunInIO m
runIn, t m a
m)
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Just YieldRateInfo
yinfo ->
if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
forall a. Bounded a => a
maxBound
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound
else Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv
SVar t m a -> m (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay SVar t m a
_sv =
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()