module Simulation.Aivika.Internal.Process
(
ProcessId,
Process(..),
ProcessLift(..),
invokeProcess,
runProcess,
runProcessUsingId,
runProcessInStartTime,
runProcessInStartTimeUsingId,
runProcessInStopTime,
runProcessInStopTimeUsingId,
spawnProcess,
spawnProcessUsingId,
spawnProcessWith,
spawnProcessUsingIdWith,
enqueueProcess,
enqueueProcessUsingId,
newProcessId,
processId,
processUsingId,
holdProcess,
interruptProcess,
processInterrupted,
processInterruptionTime,
passivateProcess,
passivateProcessBefore,
processPassive,
reactivateProcess,
reactivateProcessImmediately,
cancelProcessWithId,
cancelProcess,
processCancelled,
processCancelling,
whenCancellingProcess,
processAwait,
processPreemptionBegin,
processPreemptionEnd,
processPreemptionBeginning,
processPreemptionEnding,
processYield,
timeoutProcess,
timeoutProcessUsingId,
processParallel,
processParallelUsingIds,
processParallel_,
processParallelUsingIds_,
catchProcess,
finallyProcess,
throwProcess,
zipProcessParallel,
zip3ProcessParallel,
unzipProcess,
memoProcess,
neverProcess,
retryProcess,
transferProcess,
traceProcess) where
import Data.Maybe
import Data.IORef
import Control.Exception
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Fail
import qualified Control.Monad.Catch as MC
import Control.Applicative
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Parameter
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Cont
import Simulation.Aivika.Signal
data ProcessId =
ProcessId { processStarted :: IORef Bool,
processReactCont :: IORef (Maybe (ContParams ())),
processContId :: ContId,
processInterruptRef :: IORef Bool,
processInterruptCont :: IORef (Maybe (ContParams ())),
processInterruptTime :: IORef Double,
processInterruptVersion :: IORef Int }
newtype Process a = Process (ProcessId -> Cont a)
class ProcessLift m where
liftProcess :: Process a -> m a
instance ProcessLift Process where
liftProcess = id
invokeProcess :: ProcessId -> Process a -> Cont a
{-# INLINE invokeProcess #-}
invokeProcess pid (Process m) = m pid
holdProcess :: Double -> Process ()
holdProcess dt =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do when (dt < 0) $
error "Time period dt < 0: holdProcess"
let x = processInterruptCont pid
t = pointTime p + dt
writeIORef x $ Just c
writeIORef (processInterruptRef pid) False
writeIORef (processInterruptTime pid) t
v <- readIORef (processInterruptVersion pid)
invokeEvent p $
enqueueEvent t $
Event $ \p ->
do v' <- readIORef (processInterruptVersion pid)
when (v == v') $
do writeIORef x Nothing
invokeEvent p $ resumeCont c ()
interruptProcess :: ProcessId -> Event ()
interruptProcess pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- readIORef x
case a of
Nothing -> return ()
Just c ->
do writeIORef x Nothing
writeIORef (processInterruptRef pid) True
modifyIORef (processInterruptVersion pid) $ (+) 1
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
processInterrupted :: ProcessId -> Event Bool
processInterrupted pid =
Event $ \p ->
readIORef (processInterruptRef pid)
processInterruptionTime :: ProcessId -> Event (Maybe Double)
processInterruptionTime pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- readIORef x
case a of
Just c ->
do t <- readIORef (processInterruptTime pid)
return (Just t)
Nothing ->
return Nothing
processPreempted :: ProcessId -> Event ()
processPreempted pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- readIORef x
case a of
Just c ->
do writeIORef x Nothing
writeIORef (processInterruptRef pid) True
modifyIORef (processInterruptVersion pid) $ (+) 1
t <- readIORef (processInterruptTime pid)
let dt = t - pointTime p
c' = substituteCont c $ \a ->
Event $ \p ->
invokeEvent p $
invokeCont c $
invokeProcess pid $
holdProcess dt
invokeEvent p $
reenterCont c' ()
Nothing ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing ->
return ()
Just c ->
do let c' = substituteCont c $ reenterCont c
writeIORef x $ Just c'
passivateProcess :: Process ()
passivateProcess =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing -> writeIORef x $ Just c
Just _ -> error "Cannot passivate the process twice: passivateProcess"
passivateProcessBefore :: Event () -> Process ()
passivateProcessBefore m =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing ->
do writeIORef x $ Just c
invokeEvent p m
Just _ -> error "Cannot passivate the process twice: passivateProcessBefore"
processPassive :: ProcessId -> Event Bool
processPassive pid =
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
return $ isJust a
reactivateProcess :: ProcessId -> Event ()
reactivateProcess pid =
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing ->
return ()
Just c ->
do writeIORef x Nothing
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
reactivateProcessImmediately :: ProcessId -> Event ()
reactivateProcessImmediately pid =
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing ->
return ()
Just c ->
do writeIORef x Nothing
invokeEvent p $ resumeCont c ()
processIdPrepare :: ProcessId -> Event ()
processIdPrepare pid =
Event $ \p ->
do y <- readIORef (processStarted pid)
if y
then error $
"Another process with the specified identifier " ++
"has been started already: processIdPrepare"
else writeIORef (processStarted pid) True
let signal = contSignal $ processContId pid
invokeEvent p $
handleSignal_ signal $ \e ->
Event $ \p ->
case e of
ContCancellationInitiating ->
do z <- contCancellationActivated $ processContId pid
when z $
do invokeEvent p $ interruptProcess pid
invokeEvent p $ reactivateProcess pid
ContPreemptionBeginning ->
invokeEvent p $ processPreempted pid
ContPreemptionEnding ->
return ()
runProcess :: Process () -> Event ()
runProcess p =
do pid <- liftSimulation newProcessId
runProcessUsingId pid p
runProcessUsingId :: ProcessId -> Process () -> Event ()
runProcessUsingId pid p =
do processIdPrepare pid
runCont m cont econt ccont (processContId pid) False
where cont = return
econt = throwEvent
ccont = return
m = invokeProcess pid p
runProcessInStartTime :: Process () -> Simulation ()
runProcessInStartTime = runEventInStartTime . runProcess
runProcessInStartTimeUsingId :: ProcessId -> Process () -> Simulation ()
runProcessInStartTimeUsingId pid p =
runEventInStartTime $ runProcessUsingId pid p
runProcessInStopTime :: Process () -> Simulation ()
runProcessInStopTime = runEventInStopTime . runProcess
runProcessInStopTimeUsingId :: ProcessId -> Process () -> Simulation ()
runProcessInStopTimeUsingId pid p =
runEventInStopTime $ runProcessUsingId pid p
enqueueProcess :: Double -> Process () -> Event ()
enqueueProcess t p =
enqueueEvent t $ runProcess p
enqueueProcessUsingId :: Double -> ProcessId -> Process () -> Event ()
enqueueProcessUsingId t pid p =
enqueueEvent t $ runProcessUsingId pid p
processId :: Process ProcessId
processId = Process return
newProcessId :: Simulation ProcessId
newProcessId =
do x <- liftIO $ newIORef Nothing
y <- liftIO $ newIORef False
c <- newContId
i <- liftIO $ newIORef False
z <- liftIO $ newIORef Nothing
t <- liftIO $ newIORef 0
v <- liftIO $ newIORef 0
return ProcessId { processStarted = y,
processReactCont = x,
processContId = c,
processInterruptRef = i,
processInterruptCont = z,
processInterruptTime = t,
processInterruptVersion = v }
cancelProcessWithId :: ProcessId -> Event ()
cancelProcessWithId pid = contCancellationInitiate (processContId pid)
cancelProcess :: Process a
cancelProcess =
do pid <- processId
liftEvent $ cancelProcessWithId pid
throwProcess $
(error "The process must be cancelled already: cancelProcess." :: SomeException)
processCancelled :: ProcessId -> Event Bool
processCancelled pid = contCancellationInitiated (processContId pid)
processCancelling :: ProcessId -> Signal ()
processCancelling pid = contCancellationInitiating (processContId pid)
whenCancellingProcess :: Event () -> Process ()
whenCancellingProcess h =
Process $ \pid ->
liftEvent $
handleSignal_ (processCancelling pid) $ \() -> h
processPreemptionBegin :: ProcessId -> Event ()
processPreemptionBegin pid = contPreemptionBegin (processContId pid)
processPreemptionEnd :: ProcessId -> Event ()
processPreemptionEnd pid = contPreemptionEnd (processContId pid)
processPreemptionBeginning :: ProcessId -> Signal ()
processPreemptionBeginning pid = contPreemptionBeginning (processContId pid)
processPreemptionEnding :: ProcessId -> Signal ()
processPreemptionEnding pid = contPreemptionEnding (processContId pid)
instance Eq ProcessId where
x == y = processReactCont x == processReactCont y
instance Monad Process where
return = returnP
m >>= k = bindP m k
instance Functor Process where
fmap = liftM
instance Applicative Process where
pure = return
(<*>) = ap
instance MonadFail Process where
fail = error
instance ParameterLift Process where
liftParameter = liftPP
instance SimulationLift Process where
liftSimulation = liftSP
instance DynamicsLift Process where
liftDynamics = liftDP
instance EventLift Process where
liftEvent = liftEP
instance MonadIO Process where
liftIO = liftIOP
instance MC.MonadThrow Process where
throwM = throwProcess
instance MC.MonadCatch Process where
catch = catchProcess
returnP :: a -> Process a
{-# INLINE returnP #-}
returnP a = Process $ \pid -> return a
bindP :: Process a -> (a -> Process b) -> Process b
{-# INLINE bindP #-}
bindP (Process m) k =
Process $ \pid ->
do a <- m pid
let Process m' = k a
m' pid
liftPP :: Parameter a -> Process a
{-# INLINE liftPP #-}
liftPP m = Process $ \pid -> liftParameter m
liftSP :: Simulation a -> Process a
{-# INLINE liftSP #-}
liftSP m = Process $ \pid -> liftSimulation m
liftDP :: Dynamics a -> Process a
{-# INLINE liftDP #-}
liftDP m = Process $ \pid -> liftDynamics m
liftEP :: Event a -> Process a
{-# INLINE liftEP #-}
liftEP m = Process $ \pid -> liftEvent m
liftIOP :: IO a -> Process a
{-# INLINE liftIOP #-}
liftIOP m = Process $ \pid -> liftIO m
catchProcess :: Exception e => Process a -> (e -> Process a) -> Process a
catchProcess (Process m) h =
Process $ \pid ->
catchCont (m pid) $ \e ->
let Process m' = h e in m' pid
finallyProcess :: Process a -> Process b -> Process a
finallyProcess (Process m) (Process m') =
Process $ \pid ->
finallyCont (m pid) (m' pid)
throwProcess :: Exception e => e -> Process a
throwProcess = liftIO . throw
processParallel :: [Process a] -> Process [a]
processParallel xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds
processParallelUsingIds :: [(ProcessId, Process a)] -> Process [a]
processParallelUsingIds xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processContId pid)
processParallel_ :: [Process a] -> Process ()
processParallel_ xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds_
processParallelUsingIds_ :: [(ProcessId, Process a)] -> Process ()
processParallelUsingIds_ xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel_ $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processContId pid)
processParallelCreateIds :: [Process a] -> Simulation [(ProcessId, Process a)]
processParallelCreateIds xs =
do pids <- liftSimulation $ forM xs $ const newProcessId
return $ zip pids xs
processParallelPrepare :: [(ProcessId, Process a)] -> Event ()
processParallelPrepare xs =
Event $ \p ->
forM_ xs $ invokeEvent p . processIdPrepare . fst
processUsingId :: ProcessId -> Process a -> Process a
processUsingId pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
rerunCont (invokeProcess pid x) (processContId pid)
spawnProcess :: Process () -> Process ()
spawnProcess = spawnProcessWith CancelTogether
spawnProcessUsingId :: ProcessId -> Process () -> Process ()
spawnProcessUsingId = spawnProcessUsingIdWith CancelTogether
spawnProcessWith :: ContCancellation -> Process () -> Process ()
spawnProcessWith cancellation x =
do pid <- liftSimulation newProcessId
spawnProcessUsingIdWith cancellation pid x
spawnProcessUsingIdWith :: ContCancellation -> ProcessId -> Process () -> Process ()
spawnProcessUsingIdWith cancellation pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
spawnCont cancellation (invokeProcess pid x) (processContId pid)
processAwait :: Signal a -> Process a
processAwait signal =
Process $ \pid -> contAwait signal
data MemoResult a = MemoComputed a
| MemoError IOException
| MemoCancelled
memoProcess :: Process a -> Simulation (Process a)
memoProcess x =
do started <- liftIO $ newIORef False
computed <- newSignalSource
value <- liftIO $ newIORef Nothing
let result =
do Just x <- liftIO $ readIORef value
case x of
MemoComputed a -> return a
MemoError e -> throwProcess e
MemoCancelled -> cancelProcess
return $
do v <- liftIO $ readIORef value
case v of
Just _ -> result
Nothing ->
do f <- liftIO $ readIORef started
case f of
True ->
do processAwait $ publishSignal computed
result
False ->
do liftIO $ writeIORef started True
r <- liftIO $ newIORef MemoCancelled
finallyProcess
(catchProcess
(do a <- x
liftIO $ writeIORef r (MemoComputed a))
(\e ->
liftIO $ writeIORef r (MemoError e)))
(liftEvent $
do liftIO $
do x <- readIORef r
writeIORef value (Just x)
triggerSignal computed ())
result
zipProcessParallel :: Process a -> Process b -> Process (a, b)
zipProcessParallel x y =
do [Left a, Right b] <- processParallel [fmap Left x, fmap Right y]
return (a, b)
zip3ProcessParallel :: Process a -> Process b -> Process c -> Process (a, b, c)
zip3ProcessParallel x y z =
do [Left a,
Right (Left b),
Right (Right c)] <-
processParallel [fmap Left x,
fmap (Right . Left) y,
fmap (Right . Right) z]
return (a, b, c)
unzipProcess :: Process (a, b) -> Simulation (Process a, Process b)
unzipProcess xy =
do xy' <- memoProcess xy
return (fmap fst xy', fmap snd xy')
timeoutProcess :: Double -> Process a -> Process (Maybe a)
timeoutProcess timeout p =
do pid <- liftSimulation newProcessId
timeoutProcessUsingId timeout pid p
timeoutProcessUsingId :: Double -> ProcessId -> Process a -> Process (Maybe a)
timeoutProcessUsingId timeout pid p =
do s <- liftSimulation newSignalSource
timeoutPid <- liftSimulation newProcessId
spawnProcessUsingIdWith CancelChildAfterParent timeoutPid $
do holdProcess timeout
liftEvent $
cancelProcessWithId pid
spawnProcessUsingIdWith CancelChildAfterParent pid $
do r <- liftIO $ newIORef Nothing
finallyProcess
(catchProcess
(do a <- p
liftIO $ writeIORef r $ Just (Right a))
(\e ->
liftIO $ writeIORef r $ Just (Left e)))
(liftEvent $
do cancelProcessWithId timeoutPid
x <- liftIO $ readIORef r
triggerSignal s x)
x <- processAwait $ publishSignal s
case x of
Nothing -> return Nothing
Just (Right a) -> return (Just a)
Just (Left (SomeException e)) -> throwProcess e
processYield :: Process ()
processYield =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
invokeEvent p $
enqueueEvent (pointTime p) $
resumeCont c ()
neverProcess :: Process a
neverProcess =
Process $ \pid ->
Cont $ \c ->
let signal = processCancelling pid
in handleSignal_ signal $ \_ ->
resumeCont c $ error "It must never be computed: neverProcess"
retryProcess :: String -> Process a
retryProcess = liftEvent . retryEvent
transferProcess :: Process () -> Process a
transferProcess (Process m) =
Process $ \pid -> transferCont (m pid)
traceProcess :: String -> Process a -> Process a
traceProcess message m =
Process $ \pid ->
traceCont message $
invokeProcess pid m