{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
module Control.Scheduler.Global
( GlobalScheduler
, globalScheduler
, newGlobalScheduler
, withGlobalScheduler_
) where
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.ST
import Control.Monad.Primitive
import Control.Scheduler
import Control.Scheduler.Internal
import Control.Scheduler.Types
import Data.IORef
import Data.Maybe
import System.IO.Unsafe (unsafePerformIO)
globalScheduler :: GlobalScheduler
globalScheduler :: GlobalScheduler
globalScheduler = IO GlobalScheduler -> GlobalScheduler
forall a. IO a -> a
unsafePerformIO (Comp -> IO GlobalScheduler
forall (m :: * -> *). MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler Comp
Par)
{-# NOINLINE globalScheduler #-}
initGlobalScheduler ::
MonadUnliftIO m => Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler :: Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp Scheduler RealWorld a -> [ThreadId] -> m b
action = ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
(Jobs IO a
jobs, [ThreadId] -> Scheduler RealWorld a
mkScheduler) <- Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
forall a.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ (IO [a] -> JQueue IO a -> IO [a]
forall a b. a -> b -> a
const ([a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []))
IO [ThreadId]
-> ([ThreadId] -> IO ()) -> ([ThreadId] -> IO b) -> IO b
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError (Jobs IO a -> Comp -> IO [ThreadId]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers Jobs IO a
jobs Comp
comp) [ThreadId] -> IO ()
terminateWorkers (([ThreadId] -> IO b) -> IO b) -> ([ThreadId] -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
tids ->
m b -> IO b
forall a. m a -> IO a
run (Scheduler RealWorld a -> [ThreadId] -> m b
action ([ThreadId] -> Scheduler RealWorld a
mkScheduler [ThreadId]
tids) [ThreadId]
tids)
newGlobalScheduler :: MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler :: Comp -> m GlobalScheduler
newGlobalScheduler Comp
comp =
IO GlobalScheduler -> m GlobalScheduler
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO GlobalScheduler -> m GlobalScheduler)
-> IO GlobalScheduler -> m GlobalScheduler
forall a b. (a -> b) -> a -> b
$ Comp
-> (Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp ((Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler)
-> (Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
scheduler [ThreadId]
tids -> do
MVar (Scheduler RealWorld ())
mvar <- Scheduler RealWorld () -> IO (MVar (Scheduler RealWorld ()))
forall a. a -> IO (MVar a)
newMVar Scheduler RealWorld ()
scheduler
IORef [ThreadId]
tidsRef <- [ThreadId] -> IO (IORef [ThreadId])
forall a. a -> IO (IORef a)
newIORef [ThreadId]
tids
Weak (MVar (Scheduler RealWorld ()))
_ <- MVar (Scheduler RealWorld ())
-> IO () -> IO (Weak (MVar (Scheduler RealWorld ())))
forall a. MVar a -> IO () -> IO (Weak (MVar a))
mkWeakMVar MVar (Scheduler RealWorld ())
mvar (IORef [ThreadId] -> IO [ThreadId]
forall a. IORef a -> IO a
readIORef IORef [ThreadId]
tidsRef IO [ThreadId] -> ([ThreadId] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [ThreadId] -> IO ()
terminateWorkers)
GlobalScheduler -> IO GlobalScheduler
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GlobalScheduler -> IO GlobalScheduler)
-> GlobalScheduler -> IO GlobalScheduler
forall a b. (a -> b) -> a -> b
$
GlobalScheduler :: Comp
-> MVar (Scheduler RealWorld ())
-> IORef [ThreadId]
-> GlobalScheduler
GlobalScheduler
{ globalSchedulerComp :: Comp
globalSchedulerComp = Comp
comp
, globalSchedulerMVar :: MVar (Scheduler RealWorld ())
globalSchedulerMVar = MVar (Scheduler RealWorld ())
mvar
, globalSchedulerThreadIdsRef :: IORef [ThreadId]
globalSchedulerThreadIdsRef = IORef [ThreadId]
tidsRef
}
withGlobalScheduler_ :: MonadUnliftIO m => GlobalScheduler -> (Scheduler RealWorld () -> m a) -> m ()
withGlobalScheduler_ :: GlobalScheduler -> (Scheduler RealWorld () -> m a) -> m ()
withGlobalScheduler_ GlobalScheduler {IORef [ThreadId]
MVar (Scheduler RealWorld ())
Comp
globalSchedulerThreadIdsRef :: IORef [ThreadId]
globalSchedulerMVar :: MVar (Scheduler RealWorld ())
globalSchedulerComp :: Comp
globalSchedulerThreadIdsRef :: GlobalScheduler -> IORef [ThreadId]
globalSchedulerMVar :: GlobalScheduler -> MVar (Scheduler RealWorld ())
globalSchedulerComp :: GlobalScheduler -> Comp
..} Scheduler RealWorld () -> m a
action =
((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
let initializeNewScheduler :: m ()
initializeNewScheduler = do
Comp -> (Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
globalSchedulerComp ((Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ())
-> (Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
scheduler [ThreadId]
tids ->
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
[ThreadId]
oldTids <- IORef [ThreadId]
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [ThreadId]
globalSchedulerThreadIdsRef (([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId])
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ (,) [ThreadId]
tids
[ThreadId] -> IO ()
terminateWorkers [ThreadId]
oldTids
MVar (Scheduler RealWorld ()) -> Scheduler RealWorld () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar Scheduler RealWorld ()
scheduler
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore ->
MVar (Scheduler RealWorld ())
-> IO (Maybe (Scheduler RealWorld ()))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar IO (Maybe (Scheduler RealWorld ()))
-> (Maybe (Scheduler RealWorld ()) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Scheduler RealWorld ())
Nothing -> IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Comp -> (Scheduler RealWorld () -> m a) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
globalSchedulerComp Scheduler RealWorld () -> m a
action
Just Scheduler RealWorld ()
scheduler -> do
let runScheduler :: IO (Maybe (Results ()))
runScheduler = do
a
_ <- m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld () -> m a
action Scheduler RealWorld ()
scheduler
Maybe (Results ())
mEarly <- ST (PrimState IO) (Maybe (Results ())) -> IO (Maybe (Results ()))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler RealWorld () -> ST RealWorld (Maybe (Results ()))
forall s a. Scheduler s a -> ST s (Maybe (Results a))
_earlyResults Scheduler RealWorld ()
scheduler)
Maybe (Results ())
mEarly Maybe (Results ()) -> IO () -> IO (Maybe (Results ()))
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (Results ()) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (Results ())
mEarly) (IO (Results ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ST (PrimState IO) (Results ()) -> IO (Results ())
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler RealWorld () -> ST RealWorld (Results ())
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler RealWorld ()
scheduler)))
Maybe (Results ())
mEarly <- IO (Maybe (Results ())) -> IO (Maybe (Results ()))
forall a. IO a -> IO a
restore IO (Maybe (Results ()))
runScheduler IO (Maybe (Results ())) -> IO () -> IO (Maybe (Results ()))
forall a b. IO a -> IO b -> IO a
`onException` m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler
case Maybe (Results ())
mEarly of
Maybe (Results ())
Nothing -> MVar (Scheduler RealWorld ()) -> Scheduler RealWorld () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar Scheduler RealWorld ()
scheduler
Just Results ()
_ -> m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler