{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
module Control.Scheduler
(
Scheduler
, SchedulerWS
, Results(..)
, withScheduler
, withScheduler_
, withSchedulerR
, withSchedulerWS
, withSchedulerWS_
, withSchedulerWSR
, unwrapSchedulerWS
, trivialScheduler_
, withTrivialScheduler
, withTrivialSchedulerR
, scheduleWork
, scheduleWork_
, scheduleWorkId
, scheduleWorkId_
, scheduleWorkState
, scheduleWorkState_
, replicateWork
, replicateWork_
, Batch
, runBatch
, runBatch_
, runBatchR
, cancelBatch
, cancelBatch_
, cancelBatchWith
, hasBatchFinished
, getCurrentBatch
, terminate
, terminate_
, terminateWith
, WorkerId(..)
, WorkerStates
, numWorkers
, workerStatesComp
, initWorkerStates
, Comp(..)
, getCompWorkers
, replicateConcurrently
, replicateConcurrently_
, traverseConcurrently
, traverseConcurrently_
, traverse_
, MutexException(..)
) where
import Control.Monad
import Control.Monad.ST
import Control.Monad.IO.Unlift
import Control.Monad.Primitive
import Control.Scheduler.Computation
import Control.Scheduler.Internal
import Control.Scheduler.Types
import Control.Scheduler.Queue
import qualified Data.Foldable as F (traverse_, toList)
import Data.Primitive.SmallArray
import Data.Traversable
unwrapSchedulerWS :: SchedulerWS ws a -> Scheduler RealWorld a
unwrapSchedulerWS :: SchedulerWS ws a -> Scheduler RealWorld a
unwrapSchedulerWS = SchedulerWS ws a -> Scheduler RealWorld a
forall ws a. SchedulerWS ws a -> Scheduler RealWorld a
_getScheduler
workerStatesComp :: WorkerStates ws -> Comp
workerStatesComp :: WorkerStates ws -> Comp
workerStatesComp = WorkerStates ws -> Comp
forall ws. WorkerStates ws -> Comp
_workerStatesComp
withSchedulerWS ::
MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a]
withSchedulerWS :: WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a]
withSchedulerWS = (Comp -> (Scheduler RealWorld a -> m b) -> m [a])
-> WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a]
forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld a -> m b) -> m [a]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler
withSchedulerWS_ ::
MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws () -> m b) -> m ()
withSchedulerWS_ :: WorkerStates ws -> (SchedulerWS ws () -> m b) -> m ()
withSchedulerWS_ = (Comp -> (Scheduler RealWorld () -> m b) -> m ())
-> WorkerStates ws -> (SchedulerWS ws () -> m b) -> m ()
forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld () -> m b) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_
withSchedulerWSR ::
MonadUnliftIO m
=> WorkerStates ws
-> (SchedulerWS ws a -> m b)
-> m (Results a)
withSchedulerWSR :: WorkerStates ws -> (SchedulerWS ws a -> m b) -> m (Results a)
withSchedulerWSR = (Comp -> (Scheduler RealWorld a -> m b) -> m (Results a))
-> WorkerStates ws -> (SchedulerWS ws a -> m b) -> m (Results a)
forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld a -> m b) -> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m (Results a)
withSchedulerR
scheduleWorkState :: MonadPrimBase RealWorld m => SchedulerWS ws a -> (ws -> m a) -> m ()
scheduleWorkState :: SchedulerWS ws a -> (ws -> m a) -> m ()
scheduleWorkState SchedulerWS ws a
schedulerS ws -> m a
withState =
Scheduler RealWorld a -> (WorkerId -> m a) -> m ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> (WorkerId -> m a) -> m ()
scheduleWorkId (SchedulerWS ws a -> Scheduler RealWorld a
forall ws a. SchedulerWS ws a -> Scheduler RealWorld a
_getScheduler SchedulerWS ws a
schedulerS) ((WorkerId -> m a) -> m ()) -> (WorkerId -> m a) -> m ()
forall a b. (a -> b) -> a -> b
$ \(WorkerId Int
i) ->
ws -> m a
withState (SmallArray ws -> Int -> ws
forall a. SmallArray a -> Int -> a
indexSmallArray (WorkerStates ws -> SmallArray ws
forall ws. WorkerStates ws -> SmallArray ws
_workerStatesArray (SchedulerWS ws a -> WorkerStates ws
forall ws a. SchedulerWS ws a -> WorkerStates ws
_workerStates SchedulerWS ws a
schedulerS)) Int
i)
scheduleWorkState_ :: MonadPrimBase RealWorld m => SchedulerWS ws () -> (ws -> m ()) -> m ()
scheduleWorkState_ :: SchedulerWS ws () -> (ws -> m ()) -> m ()
scheduleWorkState_ SchedulerWS ws ()
schedulerS ws -> m ()
withState =
Scheduler RealWorld () -> (WorkerId -> m ()) -> m ()
forall s (m :: * -> *).
MonadPrimBase s m =>
Scheduler s () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ (SchedulerWS ws () -> Scheduler RealWorld ()
forall ws a. SchedulerWS ws a -> Scheduler RealWorld a
_getScheduler SchedulerWS ws ()
schedulerS) ((WorkerId -> m ()) -> m ()) -> (WorkerId -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(WorkerId Int
i) ->
ws -> m ()
withState (SmallArray ws -> Int -> ws
forall a. SmallArray a -> Int -> a
indexSmallArray (WorkerStates ws -> SmallArray ws
forall ws. WorkerStates ws -> SmallArray ws
_workerStatesArray (SchedulerWS ws () -> WorkerStates ws
forall ws a. SchedulerWS ws a -> WorkerStates ws
_workerStates SchedulerWS ws ()
schedulerS)) Int
i)
numWorkers :: Scheduler s a -> Int
numWorkers :: Scheduler s a -> Int
numWorkers = Scheduler s a -> Int
forall s a. Scheduler s a -> Int
_numWorkers
scheduleWorkId :: MonadPrimBase s m => Scheduler s a -> (WorkerId -> m a) -> m ()
scheduleWorkId :: Scheduler s a -> (WorkerId -> m a) -> m ()
scheduleWorkId Scheduler s a
s WorkerId -> m a
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
forall s a. Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId Scheduler s a
s (m a -> ST s a
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m a -> ST s a) -> (WorkerId -> m a) -> WorkerId -> ST s a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerId -> m a
f))
terminate :: MonadPrim s m => Scheduler s a -> a -> m a
terminate :: Scheduler s a -> a -> m a
terminate Scheduler s a
scheduler a
a = ST (PrimState m) a -> m a
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s a -> Early a -> ST s a
forall s a. Scheduler s a -> Early a -> ST s a
_terminate Scheduler s a
scheduler (a -> Early a
forall a. a -> Early a
Early a
a))
terminateWith :: MonadPrim s m => Scheduler s a -> a -> m a
terminateWith :: Scheduler s a -> a -> m a
terminateWith Scheduler s a
scheduler a
a = ST (PrimState m) a -> m a
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) a -> m a) -> ST (PrimState m) a -> m a
forall a b. (a -> b) -> a -> b
$ Scheduler s a -> Early a -> ST s a
forall s a. Scheduler s a -> Early a -> ST s a
_terminate Scheduler s a
scheduler (a -> Early a
forall a. a -> Early a
EarlyWith a
a)
scheduleWork :: MonadPrimBase s m => Scheduler s a -> m a -> m ()
scheduleWork :: Scheduler s a -> m a -> m ()
scheduleWork Scheduler s a
scheduler m a
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
forall s a. Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId Scheduler s a
scheduler (ST s a -> WorkerId -> ST s a
forall a b. a -> b -> a
const (m a -> ST s a
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim m a
f))
scheduleWork_ :: MonadPrimBase s m => Scheduler s () -> m () -> m ()
scheduleWork_ :: Scheduler s () -> m () -> m ()
scheduleWork_ Scheduler s ()
s = ST s () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s () -> m ()) -> (m () -> ST s ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler s () -> ST s () -> ST s ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler s ()
s (ST s () -> ST s ()) -> (m () -> ST s ()) -> m () -> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> ST s ()
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim
scheduleWorkId_ :: MonadPrimBase s m => Scheduler s () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ :: Scheduler s () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ Scheduler s ()
scheduler WorkerId -> m ()
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ Scheduler s () -> (WorkerId -> ST s ()) -> ST s ()
forall s a. Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId Scheduler s ()
scheduler (m () -> ST s ()
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m () -> ST s ()) -> (WorkerId -> m ()) -> WorkerId -> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerId -> m ()
f)
replicateWork :: MonadPrimBase s m => Scheduler s a -> Int -> m a -> m ()
replicateWork :: Scheduler s a -> Int -> m a -> m ()
replicateWork Scheduler s a
scheduler Int
n m a
f = Int -> m ()
go Int
n
where
go :: Int -> m ()
go !Int
k
| Int
k Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s a -> ST s a -> ST s ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler s a
scheduler (m a -> ST s a
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim m a
f)) m () -> m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> m ()
go (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
replicateWork_ :: MonadPrimBase s m => Scheduler s () -> Int -> m a -> m ()
replicateWork_ :: Scheduler s () -> Int -> m a -> m ()
replicateWork_ Scheduler s ()
scheduler Int
n m a
f = Int -> m ()
go Int
n
where
go :: Int -> m ()
go !Int
k
| Int
k Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s () -> ST s () -> ST s ()
forall s (m :: * -> *).
MonadPrimBase s m =>
Scheduler s () -> m () -> m ()
scheduleWork_ Scheduler s ()
scheduler (m () -> ST s ()
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m a
f))) m () -> m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> m ()
go (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
terminate_ :: MonadPrim s m => Scheduler s () -> m ()
terminate_ :: Scheduler s () -> m ()
terminate_ Scheduler s ()
s = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ Scheduler s () -> Early () -> ST s ()
forall s a. Scheduler s a -> Early a -> ST s a
_terminate Scheduler s ()
s (() -> Early ()
forall a. a -> Early a
Early ())
withTrivialScheduler :: MonadPrim s m => (Scheduler s a -> m b) -> m [a]
withTrivialScheduler :: (Scheduler s a -> m b) -> m [a]
withTrivialScheduler Scheduler s a -> m b
action = Results a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (Results a -> [a]) -> m (Results a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scheduler s a -> m b) -> m (Results a)
forall a b (m :: * -> *) s.
MonadPrim s m =>
(Scheduler s a -> m b) -> m (Results a)
withTrivialSchedulerR Scheduler s a -> m b
action
traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b)
traverseConcurrently :: Comp -> (a -> m b) -> t a -> m (t b)
traverseConcurrently Comp
comp a -> m b
f t a
xs = ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (t b)) -> m (t b))
-> ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
[b]
ys <- Comp -> (Scheduler RealWorld b -> IO ()) -> IO [b]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler Comp
comp ((Scheduler RealWorld b -> IO ()) -> IO [b])
-> (Scheduler RealWorld b -> IO ()) -> IO [b]
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld b
s -> (a -> IO ()) -> t a -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (Scheduler RealWorld b -> IO b -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld b
s (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs
t b -> IO (t b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (t b -> IO (t b)) -> t b -> IO (t b)
forall a b. (a -> b) -> a -> b
$ [b] -> t a -> t b
forall (t :: * -> *) a b. Traversable t => [a] -> t b -> t a
transList [b]
ys t a
xs
transList :: Traversable t => [a] -> t b -> t a
transList :: [a] -> t b -> t a
transList [a]
xs' = ([a], t a) -> t a
forall a b. (a, b) -> b
snd (([a], t a) -> t a) -> (t b -> ([a], t a)) -> t b -> t a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([a] -> b -> ([a], a)) -> [a] -> t b -> ([a], t a)
forall (t :: * -> *) a b c.
Traversable t =>
(a -> b -> (a, c)) -> a -> t b -> (a, t c)
mapAccumL [a] -> b -> ([a], a)
forall b p. [b] -> p -> ([b], b)
withR [a]
xs'
where
withR :: [b] -> p -> ([b], b)
withR (b
x:[b]
xs) p
_ = ([b]
xs, b
x)
withR [b]
_ p
_ = [Char] -> ([b], b)
forall a. [Char] -> a
errorWithoutStackTrace [Char]
"Impossible<traverseConcurrently> - Mismatched sizes"
traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m ()
traverseConcurrently_ :: Comp -> (a -> m b) -> t a -> m ()
traverseConcurrently_ Comp
comp a -> m b
f t a
xs =
((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
Comp -> (Scheduler RealWorld () -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
comp ((Scheduler RealWorld () -> IO ()) -> IO ())
-> (Scheduler RealWorld () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
s -> Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> t a -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
F.traverse_ (Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> (a -> IO ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs
replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a]
replicateConcurrently :: Comp -> Int -> m a -> m [a]
replicateConcurrently Comp
comp Int
n m a
f =
((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [a]) -> m [a])
-> ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
Comp -> (Scheduler RealWorld a -> IO ()) -> IO [a]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler Comp
comp ((Scheduler RealWorld a -> IO ()) -> IO [a])
-> (Scheduler RealWorld a -> IO ()) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld a
s -> Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> IO a -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld a
s (m a -> IO a
forall a. m a -> IO a
run m a
f)
replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m ()
replicateConcurrently_ :: Comp -> Int -> m a -> m ()
replicateConcurrently_ Comp
comp Int
n m a
f =
((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
Comp -> (Scheduler RealWorld () -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
comp ((Scheduler RealWorld () -> IO ()) -> IO ())
-> (Scheduler RealWorld () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
s -> Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO a -> IO ()) -> IO a -> IO ()
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
run m a
f)
withScheduler ::
MonadUnliftIO m
=> Comp
-> (Scheduler RealWorld a -> m b)
-> m [a]
withScheduler :: Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler Comp
Seq Scheduler RealWorld a -> m b
f =
((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [a]) -> m [a])
-> ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
[a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList (Results a -> [a]) -> IO (Results a) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scheduler RealWorld a -> IO b) -> IO (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
withScheduler Comp
comp Scheduler RealWorld a -> m b
f =
((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [a]) -> m [a])
-> ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
[a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList (Results a -> [a]) -> IO (Results a) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs JQueue IO a -> IO [a]
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m [a]
readResults (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
{-# INLINE withScheduler #-}
withSchedulerR ::
MonadUnliftIO m
=> Comp
-> (Scheduler RealWorld a -> m b)
-> m (Results a)
withSchedulerR :: Comp -> (Scheduler RealWorld a -> m b) -> m (Results a)
withSchedulerR Comp
Seq Scheduler RealWorld a -> m b
f =
((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a))
-> ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> IO (Results a) -> IO (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scheduler RealWorld a -> IO b) -> IO (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
withSchedulerR Comp
comp Scheduler RealWorld a -> m b
f =
((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a))
-> ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> IO (Results a) -> IO (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs JQueue IO a -> IO [a]
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m [a]
readResults (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
.Scheduler RealWorld a -> m b
f)
{-# INLINE withSchedulerR #-}
withScheduler_ ::
MonadUnliftIO m
=> Comp
-> (Scheduler RealWorld a -> m b)
-> m ()
withScheduler_ :: Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
Seq Scheduler RealWorld a -> m b
f =
((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
IO (Results a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Results a) -> IO ()) -> IO (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Scheduler RealWorld a -> IO b) -> IO (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
withScheduler_ Comp
comp Scheduler RealWorld a -> m b
f =
((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
IO (Results a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Results a) -> IO ()) -> IO (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ (IO [a] -> JQueue IO a -> IO [a]
forall a b. a -> b -> a
const ([a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [])) (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
{-# INLINE withScheduler_ #-}
hasBatchFinished :: MonadPrim s m => Batch s a -> m Bool
hasBatchFinished :: Batch s a -> m Bool
hasBatchFinished = ST s Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s Bool -> m Bool)
-> (Batch s a -> ST s Bool) -> Batch s a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> ST s Bool
forall s a. Batch s a -> ST s Bool
batchHasFinished
{-# INLINE hasBatchFinished #-}
cancelBatch :: MonadPrim s m => Batch s a -> a -> m Bool
cancelBatch :: Batch s a -> a -> m Bool
cancelBatch Batch s a
b = ST s Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s Bool -> m Bool) -> (a -> ST s Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> a -> ST s Bool
forall s a. Batch s a -> a -> ST s Bool
batchCancel Batch s a
b
{-# INLINE cancelBatch #-}
cancelBatch_ :: MonadPrim s m => Batch s () -> m Bool
cancelBatch_ :: Batch s () -> m Bool
cancelBatch_ Batch s ()
b = ST (PrimState m) Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) Bool -> m Bool)
-> ST (PrimState m) Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Batch s () -> () -> ST s Bool
forall s a. Batch s a -> a -> ST s Bool
batchCancel Batch s ()
b ()
{-# INLINE cancelBatch_ #-}
cancelBatchWith :: MonadPrim s m => Batch s a -> a -> m Bool
cancelBatchWith :: Batch s a -> a -> m Bool
cancelBatchWith Batch s a
b = ST s Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s Bool -> m Bool) -> (a -> ST s Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> a -> ST s Bool
forall s a. Batch s a -> a -> ST s Bool
batchCancelWith Batch s a
b
{-# INLINE cancelBatchWith #-}
getCurrentBatch ::
MonadPrim s m => Scheduler s a -> m (Batch s a)
getCurrentBatch :: Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s a
scheduler = ST (PrimState m) (Batch s a) -> m (Batch s a)
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) (Batch s a) -> m (Batch s a))
-> ST (PrimState m) (Batch s a) -> m (Batch s a)
forall a b. (a -> b) -> a -> b
$ do
BatchId
batchId <- Scheduler s a -> ST s BatchId
forall s a. Scheduler s a -> ST s BatchId
_currentBatchId Scheduler s a
scheduler
Batch s a -> ST s (Batch s a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Batch s a -> ST s (Batch s a)) -> Batch s a -> ST s (Batch s a)
forall a b. (a -> b) -> a -> b
$ Batch :: forall s a.
(a -> ST s Bool) -> (a -> ST s Bool) -> ST s Bool -> Batch s a
Batch
{ batchCancel :: a -> ST s Bool
batchCancel = Scheduler s a -> BatchId -> Early a -> ST s Bool
forall s a. Scheduler s a -> BatchId -> Early a -> ST s Bool
_cancelBatch Scheduler s a
scheduler BatchId
batchId (Early a -> ST s Bool) -> (a -> Early a) -> a -> ST s Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Early a
forall a. a -> Early a
Early
, batchCancelWith :: a -> ST s Bool
batchCancelWith = Scheduler s a -> BatchId -> Early a -> ST s Bool
forall s a. Scheduler s a -> BatchId -> Early a -> ST s Bool
_cancelBatch Scheduler s a
scheduler BatchId
batchId (Early a -> ST s Bool) -> (a -> Early a) -> a -> ST s Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Early a
forall a. a -> Early a
EarlyWith
, batchHasFinished :: ST s Bool
batchHasFinished = (BatchId
batchId BatchId -> BatchId -> Bool
forall a. Eq a => a -> a -> Bool
/=) (BatchId -> Bool) -> ST s BatchId -> ST s Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler s a -> ST s BatchId
forall s a. Scheduler s a -> ST s BatchId
_currentBatchId Scheduler s a
scheduler
}
{-# INLINE getCurrentBatch #-}
runBatch :: MonadPrimBase s m => Scheduler s a -> (Batch s a -> m c) -> m [a]
runBatch :: Scheduler s a -> (Batch s a -> m c) -> m [a]
runBatch Scheduler s a
scheduler Batch s a -> m c
f = ST (PrimState m) [a] -> m [a]
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) [a] -> m [a]) -> ST (PrimState m) [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ do
c
_ <- m c -> ST s c
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m c -> ST s c) -> (Batch s a -> m c) -> Batch s a -> ST s c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> m c
f (Batch s a -> ST s c) -> ST s (Batch s a) -> ST s c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler s a -> ST s (Batch s a)
forall s (m :: * -> *) a.
MonadPrim s m =>
Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s a
scheduler
[a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList (Results a -> [a]) -> ST s (Results a) -> ST s [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler s a -> ST s (Results a)
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler s a
scheduler
{-# INLINE runBatch #-}
runBatch_ ::
MonadPrimBase s m => Scheduler s () -> (Batch s () -> m c) -> m ()
runBatch_ :: Scheduler s () -> (Batch s () -> m c) -> m ()
runBatch_ Scheduler s ()
scheduler Batch s () -> m c
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ do
c
_ <- m c -> ST s c
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m c -> ST s c) -> (Batch s () -> m c) -> Batch s () -> ST s c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s () -> m c
f (Batch s () -> ST s c) -> ST s (Batch s ()) -> ST s c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler s () -> ST s (Batch s ())
forall s (m :: * -> *) a.
MonadPrim s m =>
Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s ()
scheduler
ST s (Results ()) -> ST s ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Scheduler s () -> ST s (Results ())
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler s ()
scheduler)
{-# INLINE runBatch_ #-}
runBatchR ::
MonadPrimBase s m => Scheduler s a -> (Batch s a -> m c) -> m (Results a)
runBatchR :: Scheduler s a -> (Batch s a -> m c) -> m (Results a)
runBatchR Scheduler s a
scheduler Batch s a -> m c
f = ST (PrimState m) (Results a) -> m (Results a)
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) (Results a) -> m (Results a))
-> ST (PrimState m) (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
c
_ <- m c -> ST s c
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m c -> ST s c) -> (Batch s a -> m c) -> Batch s a -> ST s c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> m c
f (Batch s a -> ST s c) -> ST s (Batch s a) -> ST s c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler s a -> ST s (Batch s a)
forall s (m :: * -> *) a.
MonadPrim s m =>
Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s a
scheduler
Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> ST s (Results a) -> ST s (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler s a -> ST s (Results a)
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler s a
scheduler
{-# INLINE runBatchR #-}