{-# OPTIONS_HADDOCK hide, not-home #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
-- |
-- Module      : Control.Scheduler.Queue
-- Copyright   : (c) Alexey Kuleshevich 2018-2019
-- License     : BSD3
-- Maintainer  : Alexey Kuleshevich <lehins@yandex.ru>
-- Stability   : experimental
-- Portability : non-portable
--
module Control.Scheduler.Queue
  (  -- * Job queue
    Job(Job_)
  , mkJob
  , Queue(..)
  , JQueue(..)
  , WorkerId(..)
  , newJQueue
  , pushJQueue
  , popJQueue
  , clearPendingJQueue
  , readResults
  , blockPopJQueue
  , unblockPopJQueue
  ) where

import Control.Concurrent.MVar
import Control.Monad (join)
import Control.Monad.IO.Unlift
import Data.Atomics (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Data.Maybe
import Data.IORef

-- | A blocking unbounded queue that keeps the jobs in FIFO order and the results IORefs
-- in reversed
data Queue m a = Queue
  { Queue m a -> [Job m a]
qQueue   :: ![Job m a]
  , Queue m a -> [Job m a]
qStack   :: ![Job m a]
  , Queue m a -> [IORef (Maybe a)]
qResults :: ![IORef (Maybe a)]
  , Queue m a -> MVar ()
qBaton   :: {-# UNPACK #-}!(MVar ())
  }


-- | A unique id for the worker in the `Control.Scheduler.Scheduler` context. It will
-- always be a number from @0@ up to, but not including, the number of workers a scheduler
-- has, which in turn can always be determined with `Control.Scheduler.numWorkers` function.
--
-- @since 1.4.0
newtype WorkerId = WorkerId
  { WorkerId -> Int
getWorkerId :: Int
  } deriving (Int -> WorkerId -> ShowS
[WorkerId] -> ShowS
WorkerId -> String
(Int -> WorkerId -> ShowS)
-> (WorkerId -> String) -> ([WorkerId] -> ShowS) -> Show WorkerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerId] -> ShowS
$cshowList :: [WorkerId] -> ShowS
show :: WorkerId -> String
$cshow :: WorkerId -> String
showsPrec :: Int -> WorkerId -> ShowS
$cshowsPrec :: Int -> WorkerId -> ShowS
Show, ReadPrec [WorkerId]
ReadPrec WorkerId
Int -> ReadS WorkerId
ReadS [WorkerId]
(Int -> ReadS WorkerId)
-> ReadS [WorkerId]
-> ReadPrec WorkerId
-> ReadPrec [WorkerId]
-> Read WorkerId
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [WorkerId]
$creadListPrec :: ReadPrec [WorkerId]
readPrec :: ReadPrec WorkerId
$creadPrec :: ReadPrec WorkerId
readList :: ReadS [WorkerId]
$creadList :: ReadS [WorkerId]
readsPrec :: Int -> ReadS WorkerId
$creadsPrec :: Int -> ReadS WorkerId
Read, WorkerId -> WorkerId -> Bool
(WorkerId -> WorkerId -> Bool)
-> (WorkerId -> WorkerId -> Bool) -> Eq WorkerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerId -> WorkerId -> Bool
$c/= :: WorkerId -> WorkerId -> Bool
== :: WorkerId -> WorkerId -> Bool
$c== :: WorkerId -> WorkerId -> Bool
Eq, Eq WorkerId
Eq WorkerId
-> (WorkerId -> WorkerId -> Ordering)
-> (WorkerId -> WorkerId -> Bool)
-> (WorkerId -> WorkerId -> Bool)
-> (WorkerId -> WorkerId -> Bool)
-> (WorkerId -> WorkerId -> Bool)
-> (WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId -> WorkerId)
-> Ord WorkerId
WorkerId -> WorkerId -> Bool
WorkerId -> WorkerId -> Ordering
WorkerId -> WorkerId -> WorkerId
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: WorkerId -> WorkerId -> WorkerId
$cmin :: WorkerId -> WorkerId -> WorkerId
max :: WorkerId -> WorkerId -> WorkerId
$cmax :: WorkerId -> WorkerId -> WorkerId
>= :: WorkerId -> WorkerId -> Bool
$c>= :: WorkerId -> WorkerId -> Bool
> :: WorkerId -> WorkerId -> Bool
$c> :: WorkerId -> WorkerId -> Bool
<= :: WorkerId -> WorkerId -> Bool
$c<= :: WorkerId -> WorkerId -> Bool
< :: WorkerId -> WorkerId -> Bool
$c< :: WorkerId -> WorkerId -> Bool
compare :: WorkerId -> WorkerId -> Ordering
$ccompare :: WorkerId -> WorkerId -> Ordering
$cp1Ord :: Eq WorkerId
Ord, Int -> WorkerId
WorkerId -> Int
WorkerId -> [WorkerId]
WorkerId -> WorkerId
WorkerId -> WorkerId -> [WorkerId]
WorkerId -> WorkerId -> WorkerId -> [WorkerId]
(WorkerId -> WorkerId)
-> (WorkerId -> WorkerId)
-> (Int -> WorkerId)
-> (WorkerId -> Int)
-> (WorkerId -> [WorkerId])
-> (WorkerId -> WorkerId -> [WorkerId])
-> (WorkerId -> WorkerId -> [WorkerId])
-> (WorkerId -> WorkerId -> WorkerId -> [WorkerId])
-> Enum WorkerId
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: WorkerId -> WorkerId -> WorkerId -> [WorkerId]
$cenumFromThenTo :: WorkerId -> WorkerId -> WorkerId -> [WorkerId]
enumFromTo :: WorkerId -> WorkerId -> [WorkerId]
$cenumFromTo :: WorkerId -> WorkerId -> [WorkerId]
enumFromThen :: WorkerId -> WorkerId -> [WorkerId]
$cenumFromThen :: WorkerId -> WorkerId -> [WorkerId]
enumFrom :: WorkerId -> [WorkerId]
$cenumFrom :: WorkerId -> [WorkerId]
fromEnum :: WorkerId -> Int
$cfromEnum :: WorkerId -> Int
toEnum :: Int -> WorkerId
$ctoEnum :: Int -> WorkerId
pred :: WorkerId -> WorkerId
$cpred :: WorkerId -> WorkerId
succ :: WorkerId -> WorkerId
$csucc :: WorkerId -> WorkerId
Enum, WorkerId
WorkerId -> WorkerId -> Bounded WorkerId
forall a. a -> a -> Bounded a
maxBound :: WorkerId
$cmaxBound :: WorkerId
minBound :: WorkerId
$cminBound :: WorkerId
Bounded, Integer -> WorkerId
WorkerId -> WorkerId
WorkerId -> WorkerId -> WorkerId
(WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId)
-> (WorkerId -> WorkerId)
-> (WorkerId -> WorkerId)
-> (Integer -> WorkerId)
-> Num WorkerId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> WorkerId
$cfromInteger :: Integer -> WorkerId
signum :: WorkerId -> WorkerId
$csignum :: WorkerId -> WorkerId
abs :: WorkerId -> WorkerId
$cabs :: WorkerId -> WorkerId
negate :: WorkerId -> WorkerId
$cnegate :: WorkerId -> WorkerId
* :: WorkerId -> WorkerId -> WorkerId
$c* :: WorkerId -> WorkerId -> WorkerId
- :: WorkerId -> WorkerId -> WorkerId
$c- :: WorkerId -> WorkerId -> WorkerId
+ :: WorkerId -> WorkerId -> WorkerId
$c+ :: WorkerId -> WorkerId -> WorkerId
Num, Num WorkerId
Ord WorkerId
Num WorkerId
-> Ord WorkerId -> (WorkerId -> Rational) -> Real WorkerId
WorkerId -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: WorkerId -> Rational
$ctoRational :: WorkerId -> Rational
$cp2Real :: Ord WorkerId
$cp1Real :: Num WorkerId
Real, Enum WorkerId
Real WorkerId
Real WorkerId
-> Enum WorkerId
-> (WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId -> WorkerId)
-> (WorkerId -> WorkerId -> (WorkerId, WorkerId))
-> (WorkerId -> WorkerId -> (WorkerId, WorkerId))
-> (WorkerId -> Integer)
-> Integral WorkerId
WorkerId -> Integer
WorkerId -> WorkerId -> (WorkerId, WorkerId)
WorkerId -> WorkerId -> WorkerId
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: WorkerId -> Integer
$ctoInteger :: WorkerId -> Integer
divMod :: WorkerId -> WorkerId -> (WorkerId, WorkerId)
$cdivMod :: WorkerId -> WorkerId -> (WorkerId, WorkerId)
quotRem :: WorkerId -> WorkerId -> (WorkerId, WorkerId)
$cquotRem :: WorkerId -> WorkerId -> (WorkerId, WorkerId)
mod :: WorkerId -> WorkerId -> WorkerId
$cmod :: WorkerId -> WorkerId -> WorkerId
div :: WorkerId -> WorkerId -> WorkerId
$cdiv :: WorkerId -> WorkerId -> WorkerId
rem :: WorkerId -> WorkerId -> WorkerId
$crem :: WorkerId -> WorkerId -> WorkerId
quot :: WorkerId -> WorkerId -> WorkerId
$cquot :: WorkerId -> WorkerId -> WorkerId
$cp2Integral :: Enum WorkerId
$cp1Integral :: Real WorkerId
Integral)


popQueue :: Queue m a -> Maybe (Job m a, Queue m a)
popQueue :: Queue m a -> Maybe (Job m a, Queue m a)
popQueue Queue m a
queue =
  case Queue m a -> [Job m a]
forall (m :: * -> *) a. Queue m a -> [Job m a]
qQueue Queue m a
queue of
    Job m a
x:[Job m a]
xs -> (Job m a, Queue m a) -> Maybe (Job m a, Queue m a)
forall a. a -> Maybe a
Just (Job m a
x, Queue m a
queue {qQueue :: [Job m a]
qQueue = [Job m a]
xs})
    [] ->
      case [Job m a] -> [Job m a]
forall a. [a] -> [a]
reverse (Queue m a -> [Job m a]
forall (m :: * -> *) a. Queue m a -> [Job m a]
qStack Queue m a
queue) of
        []   -> Maybe (Job m a, Queue m a)
forall a. Maybe a
Nothing
        Job m a
y:[Job m a]
ys -> (Job m a, Queue m a) -> Maybe (Job m a, Queue m a)
forall a. a -> Maybe a
Just (Job m a
y, Queue m a
queue {qQueue :: [Job m a]
qQueue = [Job m a]
ys, qStack :: [Job m a]
qStack = []})
{-# INLINEABLE popQueue #-}

data Job m a
  = Job {-# UNPACK #-} !(IORef (Maybe a)) (WorkerId -> m ())
  | Job_ (WorkerId -> m ())


mkJob :: MonadIO m => ((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob :: ((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob (a -> m ()) -> WorkerId -> m ()
action = do
  IORef (Maybe a)
resRef <- IO (IORef (Maybe a)) -> m (IORef (Maybe a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe a)) -> m (IORef (Maybe a)))
-> IO (IORef (Maybe a)) -> m (IORef (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
  Job m a -> m (Job m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Job m a -> m (Job m a)) -> Job m a -> m (Job m a)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> (WorkerId -> m ()) -> Job m a
forall (m :: * -> *) a.
IORef (Maybe a) -> (WorkerId -> m ()) -> Job m a
Job IORef (Maybe a)
resRef ((a -> m ()) -> WorkerId -> m ()
action (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (a -> IO ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
resRef (Maybe a -> IO ()) -> (a -> Maybe a) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just))
{-# INLINEABLE mkJob #-}

data JQueue m a =
  JQueue
    { JQueue m a -> IORef (Queue m a)
jqQueueRef :: {-# UNPACK #-}!(IORef (Queue m a))
    , JQueue m a -> MVar ()
jqLock     :: {-# UNPACK #-}!(MVar ())
    }

newJQueue :: MonadIO m => m (JQueue m a)
newJQueue :: m (JQueue m a)
newJQueue =
  IO (JQueue m a) -> m (JQueue m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (JQueue m a) -> m (JQueue m a))
-> IO (JQueue m a) -> m (JQueue m a)
forall a b. (a -> b) -> a -> b
$ do
    MVar ()
newLock <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar ()
newBaton <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef (Queue m a)
queueRef <- Queue m a -> IO (IORef (Queue m a))
forall a. a -> IO (IORef a)
newIORef ([Job m a] -> [Job m a] -> [IORef (Maybe a)] -> MVar () -> Queue m a
forall (m :: * -> *) a.
[Job m a] -> [Job m a] -> [IORef (Maybe a)] -> MVar () -> Queue m a
Queue [] [] [] MVar ()
newBaton)
    JQueue m a -> IO (JQueue m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (JQueue m a -> IO (JQueue m a)) -> JQueue m a -> IO (JQueue m a)
forall a b. (a -> b) -> a -> b
$ IORef (Queue m a) -> MVar () -> JQueue m a
forall (m :: * -> *) a. IORef (Queue m a) -> MVar () -> JQueue m a
JQueue IORef (Queue m a)
queueRef MVar ()
newLock

-- | Pushes an item onto a queue and returns the previous count.
pushJQueue :: MonadIO m => JQueue m a -> Job m a -> m ()
pushJQueue :: JQueue m a -> Job m a -> m ()
pushJQueue (JQueue IORef (Queue m a)
jQueueRef MVar ()
_) Job m a
job =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    MVar ()
newBaton <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
      IORef (Queue m a)
-> (Queue m a -> (Queue m a, IO ())) -> IO (IO ())
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Queue m a)
jQueueRef ((Queue m a -> (Queue m a, IO ())) -> IO (IO ()))
-> (Queue m a -> (Queue m a, IO ())) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \queue :: Queue m a
queue@Queue {[Job m a]
qStack :: [Job m a]
qStack :: forall (m :: * -> *) a. Queue m a -> [Job m a]
qStack, [IORef (Maybe a)]
qResults :: [IORef (Maybe a)]
qResults :: forall (m :: * -> *) a. Queue m a -> [IORef (Maybe a)]
qResults, MVar ()
qBaton :: MVar ()
qBaton :: forall (m :: * -> *) a. Queue m a -> MVar ()
qBaton} ->
        ( Queue m a
queue
            { qResults :: [IORef (Maybe a)]
qResults =
                case Job m a
job of
                  Job IORef (Maybe a)
resRef WorkerId -> m ()
_ -> IORef (Maybe a)
resRef IORef (Maybe a) -> [IORef (Maybe a)] -> [IORef (Maybe a)]
forall a. a -> [a] -> [a]
: [IORef (Maybe a)]
qResults
                  Job m a
_ -> [IORef (Maybe a)]
qResults
            , qStack :: [Job m a]
qStack = Job m a
job Job m a -> [Job m a] -> [Job m a]
forall a. a -> [a] -> [a]
: [Job m a]
qStack
            , qBaton :: MVar ()
qBaton = MVar ()
newBaton
            }
        , MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
qBaton ())
{-# INLINEABLE pushJQueue #-}

-- | Pops an item from the queue. The job returns the total job counts that is still left
-- in the queue
popJQueue :: MonadUnliftIO m => JQueue m a -> m (WorkerId -> m ())
popJQueue :: JQueue m a -> m (WorkerId -> m ())
popJQueue (JQueue IORef (Queue m a)
jQueueRef MVar ()
lock) = IO (WorkerId -> m ()) -> m (WorkerId -> m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (WorkerId -> m ())
inner
  where
    inner :: IO (WorkerId -> m ())
inner = do
      MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
lock
      IO (IO (WorkerId -> m ())) -> IO (WorkerId -> m ())
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (WorkerId -> m ())) -> IO (WorkerId -> m ()))
-> IO (IO (WorkerId -> m ())) -> IO (WorkerId -> m ())
forall a b. (a -> b) -> a -> b
$
        IORef (Queue m a)
-> (Queue m a -> (Queue m a, IO (WorkerId -> m ())))
-> IO (IO (WorkerId -> m ()))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Queue m a)
jQueueRef ((Queue m a -> (Queue m a, IO (WorkerId -> m ())))
 -> IO (IO (WorkerId -> m ())))
-> (Queue m a -> (Queue m a, IO (WorkerId -> m ())))
-> IO (IO (WorkerId -> m ()))
forall a b. (a -> b) -> a -> b
$ \queue :: Queue m a
queue@Queue {MVar ()
qBaton :: MVar ()
qBaton :: forall (m :: * -> *) a. Queue m a -> MVar ()
qBaton} ->
          case Queue m a -> Maybe (Job m a, Queue m a)
forall (m :: * -> *) a. Queue m a -> Maybe (Job m a, Queue m a)
popQueue Queue m a
queue of
            Maybe (Job m a, Queue m a)
Nothing -> (Queue m a
queue, MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
qBaton IO () -> IO (WorkerId -> m ()) -> IO (WorkerId -> m ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (WorkerId -> m ())
inner)
            Just (Job m a
job, Queue m a
newQueue) ->
              ( Queue m a
newQueue
              , case Job m a
job of
                  Job IORef (Maybe a)
_ WorkerId -> m ()
action -> (WorkerId -> m ()) -> IO (WorkerId -> m ())
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerId -> m ()
action
                  Job_ WorkerId -> m ()
action_ -> (WorkerId -> m ()) -> IO (WorkerId -> m ())
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerId -> m ()
action_)
{-# INLINEABLE popJQueue #-}

unblockPopJQueue :: MonadIO m => JQueue m a -> m ()
unblockPopJQueue :: JQueue m a -> m ()
unblockPopJQueue (JQueue IORef (Queue m a)
_ MVar ()
lock) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
lock ()
{-# INLINEABLE unblockPopJQueue #-}

blockPopJQueue :: MonadIO m => JQueue m a -> m ()
blockPopJQueue :: JQueue m a -> m ()
blockPopJQueue (JQueue IORef (Queue m a)
_ MVar ()
lock) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
lock
{-# INLINEABLE blockPopJQueue #-}

-- | Clears any jobs that haven't been started yet. Returns the number of jobs that are
-- still in progress and have not been yet been completed.
clearPendingJQueue :: MonadIO m => JQueue m a -> m ()
clearPendingJQueue :: JQueue m a -> m ()
clearPendingJQueue (JQueue IORef (Queue m a)
queueRef MVar ()
_) =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Queue m a) -> (Queue m a -> Queue m a) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef (Queue m a)
queueRef ((Queue m a -> Queue m a) -> IO ())
-> (Queue m a -> Queue m a) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Queue m a
queue -> (Queue m a
queue {qQueue :: [Job m a]
qQueue = [], qStack :: [Job m a]
qStack = []})
{-# INLINEABLE clearPendingJQueue #-}


-- | Extracts all results available up to now, the uncomputed ones are discarded. This
-- also has an affect of resetting the total job count to zero.
readResults :: MonadIO m => JQueue m a -> m [a]
readResults :: JQueue m a -> m [a]
readResults (JQueue IORef (Queue m a)
jQueueRef MVar ()
_) =
  IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ do
    [IORef (Maybe a)]
results <-
      IORef (Queue m a)
-> (Queue m a -> (Queue m a, [IORef (Maybe a)]))
-> IO [IORef (Maybe a)]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Queue m a)
jQueueRef ((Queue m a -> (Queue m a, [IORef (Maybe a)]))
 -> IO [IORef (Maybe a)])
-> (Queue m a -> (Queue m a, [IORef (Maybe a)]))
-> IO [IORef (Maybe a)]
forall a b. (a -> b) -> a -> b
$ \Queue m a
queue ->
        (Queue m a
queue {qQueue :: [Job m a]
qQueue = [], qStack :: [Job m a]
qStack = [], qResults :: [IORef (Maybe a)]
qResults = []}, Queue m a -> [IORef (Maybe a)]
forall (m :: * -> *) a. Queue m a -> [IORef (Maybe a)]
qResults Queue m a
queue)
    [Maybe a] -> [a]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe a] -> [a]) -> IO [Maybe a] -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IORef (Maybe a) -> IO (Maybe a))
-> [IORef (Maybe a)] -> IO [Maybe a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef [IORef (Maybe a)]
results
{-# INLINEABLE readResults #-}