module Simulation.Aivika.Trans.Task
(
Task,
TaskResult(..),
taskId,
tryGetTaskResult,
taskResult,
taskResultReceived,
taskProcess,
cancelTask,
taskCancelled,
runTask,
runTaskUsingId,
spawnTask,
spawnTaskUsingId,
spawnTaskWith,
spawnTaskUsingIdWith,
enqueueTask,
enqueueTaskUsingId,
taskParallelResult,
taskParallelProcess) where
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Internal.Specs
import Simulation.Aivika.Trans.Internal.Parameter
import Simulation.Aivika.Trans.Internal.Simulation
import Simulation.Aivika.Trans.Internal.Dynamics
import Simulation.Aivika.Trans.Internal.Event
import Simulation.Aivika.Trans.Internal.Cont
import Simulation.Aivika.Trans.Internal.Process
import Simulation.Aivika.Trans.Signal
data Task m a =
Task { taskId :: ProcessId m,
taskResultRef :: Ref m (Maybe (TaskResult a)),
taskResultReceived :: Signal m (TaskResult a)
}
data TaskResult a = TaskCompleted a
| TaskError SomeException
| TaskCancelled
tryGetTaskResult :: MonadDES m => Task m a -> Event m (Maybe (TaskResult a))
{-# INLINABLE tryGetTaskResult #-}
tryGetTaskResult t = readRef (taskResultRef t)
taskResult :: MonadDES m => Task m a -> Process m (TaskResult a)
{-# INLINABLE taskResult #-}
taskResult t =
do x <- liftEvent $ readRef (taskResultRef t)
case x of
Just x -> return x
Nothing -> processAwait (taskResultReceived t)
cancelTask :: MonadDES m => Task m a -> Event m ()
{-# INLINABLE cancelTask #-}
cancelTask t =
cancelProcessWithId (taskId t)
taskCancelled :: MonadDES m => Task m a -> Event m Bool
{-# INLINABLE taskCancelled #-}
taskCancelled t =
processCancelled (taskId t)
newTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Event m (Task m a, Process m ())
{-# INLINABLE newTaskUsingId #-}
newTaskUsingId pid p =
do r <- liftSimulation $ newRef Nothing
s <- liftSimulation newSignalSource
let t = Task { taskId = pid,
taskResultRef = r,
taskResultReceived = publishSignal s }
let m =
do v <- liftSimulation $ newRef TaskCancelled
finallyProcess
(catchProcess
(do a <- p
liftEvent $ writeRef v (TaskCompleted a))
(\e ->
liftEvent $ writeRef v (TaskError e)))
(liftEvent $
do x <- readRef v
writeRef r (Just x)
triggerSignal s x)
return (t, m)
runTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Event m (Task m a)
{-# INLINABLE runTaskUsingId #-}
runTaskUsingId pid p =
do (t, m) <- newTaskUsingId pid p
runProcessUsingId pid m
return t
runTask :: MonadDES m => Process m a -> Event m (Task m a)
{-# INLINABLE runTask #-}
runTask p =
do pid <- liftSimulation newProcessId
runTaskUsingId pid p
enqueueTaskUsingId :: MonadDES m => Double -> ProcessId m -> Process m a -> Event m (Task m a)
{-# INLINABLE enqueueTaskUsingId #-}
enqueueTaskUsingId time pid p =
do (t, m) <- newTaskUsingId pid p
enqueueProcessUsingId time pid m
return t
enqueueTask :: MonadDES m => Double -> Process m a -> Event m (Task m a)
{-# INLINABLE enqueueTask #-}
enqueueTask time p =
do pid <- liftSimulation newProcessId
enqueueTaskUsingId time pid p
spawnTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Process m (Task m a)
{-# INLINABLE spawnTaskUsingId #-}
spawnTaskUsingId = spawnTaskUsingIdWith CancelTogether
spawnTask :: MonadDES m => Process m a -> Process m (Task m a)
{-# INLINABLE spawnTask #-}
spawnTask = spawnTaskWith CancelTogether
spawnTaskUsingIdWith :: MonadDES m => ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a)
{-# INLINABLE spawnTaskUsingIdWith #-}
spawnTaskUsingIdWith cancellation pid p =
do (t, m) <- liftEvent $ newTaskUsingId pid p
spawnProcessUsingIdWith cancellation pid m
return t
spawnTaskWith :: MonadDES m => ContCancellation -> Process m a -> Process m (Task m a)
{-# INLINABLE spawnTaskWith #-}
spawnTaskWith cancellation p =
do pid <- liftSimulation newProcessId
spawnTaskUsingIdWith cancellation pid p
taskProcess :: MonadDES m => Task m a -> Process m a
{-# INLINABLE taskProcess #-}
taskProcess t =
do x <- finallyProcess
(taskResult t)
(do pid <- processId
liftEvent $
do cancelled <- processCancelled pid
when cancelled $
cancelTask t)
case x of
TaskCompleted a -> return a
TaskError e -> throwProcess e
TaskCancelled -> cancelProcess
taskParallelResult :: MonadDES m => Task m a -> Task m a -> Process m (TaskResult a, Task m a)
{-# INLINABLE taskParallelResult #-}
taskParallelResult t1 t2 =
do x1 <- liftEvent $ readRef (taskResultRef t1)
case x1 of
Just x1 -> return (x1, t2)
Nothing ->
do x2 <- liftEvent $ readRef (taskResultRef t2)
case x2 of
Just x2 -> return (x2, t1)
Nothing ->
do let s1 = fmap Left $ taskResultReceived t1
s2 = fmap Right $ taskResultReceived t2
x <- processAwait $ s1 <> s2
case x of
Left x1 -> return (x1, t2)
Right x2 -> return (x2, t1)
taskParallelProcess :: MonadDES m => Task m a -> Task m a -> Process m (a, Task m a)
{-# INLINABLE taskParallelProcess #-}
taskParallelProcess t1 t2 =
do (x, t) <-
finallyProcess
(taskParallelResult t1 t2)
(do pid <- processId
liftEvent $
do cancelled <- processCancelled pid
when cancelled $
do cancelTask t1
cancelTask t2)
case x of
TaskCompleted a -> return (a, t)
TaskError e ->
do liftEvent $ cancelTask t
throwProcess e
TaskCancelled ->
do liftEvent $ cancelTask t
cancelProcess