{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# OPTIONS_HADDOCK prune #-}
module Core.Program.Workers
(
newQueue
, writeQueue
, writeQueue'
, finishQueue
, runWorkers_
, mapWorkers
, Queue
, unQueue
, getMachineSize
) where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue (TQueue, flushTQueue, newTQueueIO, readTQueue, unGetTQueue, writeTQueue)
import Control.Monad
( forM
)
import Core.Program.Context
import Core.Program.Threads
import Core.System.Base
import GHC.Conc (getNumCapabilities)
getMachineSize :: Program τ Int
getMachineSize :: forall τ. Program τ Int
getMachineSize = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
IO Int
getNumCapabilities
newtype Queue α = Queue (TQueue (Maybe α))
newQueue :: Program τ (Queue α)
newQueue :: forall τ α. Program τ (Queue α)
newQueue = do
TQueue (Maybe α)
queue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. IO (TQueue a)
newTQueueIO
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall α. TQueue (Maybe α) -> Queue α
Queue TQueue (Maybe α)
queue)
writeQueue :: Queue α -> α -> Program τ ()
writeQueue :: forall α τ. Queue α -> α -> Program τ ()
writeQueue (Queue TQueue (Maybe α)
queue) α
item = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe α)
queue (forall a. a -> Maybe a
Just α
item)
writeQueue' :: Foldable ω => Queue α -> ω α -> Program τ ()
writeQueue' :: forall (ω :: * -> *) α τ.
Foldable ω =>
Queue α -> ω α -> Program τ ()
writeQueue' (Queue TQueue (Maybe α)
queue) ω α
items = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
( \α
item ->
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe α)
queue (forall a. a -> Maybe a
Just α
item)
)
ω α
items
finishQueue :: Queue α -> Program τ ()
finishQueue :: forall α τ. Queue α -> Program τ ()
finishQueue (Queue TQueue (Maybe α)
queue) = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe α)
queue forall a. Maybe a
Nothing
unQueue :: Queue α -> TQueue (Maybe α)
unQueue :: forall α. Queue α -> TQueue (Maybe α)
unQueue (Queue TQueue (Maybe α)
queue) = TQueue (Maybe α)
queue
runWorkers_ :: Int -> (α -> Program τ ()) -> Queue α -> Program τ ()
runWorkers_ :: forall α τ. Int -> (α -> Program τ ()) -> Queue α -> Program τ ()
runWorkers_ Int
n α -> Program τ ()
action (Queue TQueue (Maybe α)
queue) = do
forall τ α. Program τ α -> Program τ α
createScope forall a b. (a -> b) -> a -> b
$ do
[Thread ()]
ts <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1 .. Int
n] forall a b. (a -> b) -> a -> b
$ \Int
_ -> do
forall τ α. Program τ α -> Program τ (Thread α)
forkThread forall a b. (a -> b) -> a -> b
$ do
Program τ ()
loop
[Either SomeException ()]
_ <- forall α τ. [Thread α] -> Program τ [Either SomeException α]
waitThreads' [Thread ()]
ts
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
loop :: Program τ ()
loop = do
Maybe α
possibleItem <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TQueue a -> STM a
readTQueue TQueue (Maybe α)
queue
case Maybe α
possibleItem of
Maybe α
Nothing -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue (Maybe α)
queue forall a. Maybe a
Nothing
Just α
item -> do
α -> Program τ ()
action α
item
Program τ ()
loop
mapWorkers :: Int -> (α -> Program τ β) -> [α] -> Program τ [β]
mapWorkers :: forall α τ β. Int -> (α -> Program τ β) -> [α] -> Program τ [β]
mapWorkers Int
n α -> Program τ β
action [α]
items = do
Queue α
inputs <- forall τ α. Program τ (Queue α)
newQueue
TQueue β
outputs <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. IO (TQueue a)
newTQueueIO :: IO (TQueue β)
forall (ω :: * -> *) α τ.
Foldable ω =>
Queue α -> ω α -> Program τ ()
writeQueue' Queue α
inputs [α]
items
forall α τ. Queue α -> Program τ ()
finishQueue Queue α
inputs
forall α τ. Int -> (α -> Program τ ()) -> Queue α -> Program τ ()
runWorkers_
Int
n
( \α
item -> do
β
result <- α -> Program τ β
action α
item
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue β
outputs β
result
)
Queue α
inputs
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TQueue a -> STM [a]
flushTQueue TQueue β
outputs