module Simulation.Aivika.Internal.Process
(
ProcessId,
Process(..),
ProcessLift(..),
invokeProcess,
runProcess,
runProcessUsingId,
runProcessInStartTime,
runProcessInStartTimeUsingId,
runProcessInStopTime,
runProcessInStopTimeUsingId,
spawnProcess,
spawnProcessUsingId,
enqueueProcess,
enqueueProcessUsingId,
newProcessId,
processId,
processUsingId,
holdProcess,
interruptProcess,
processInterrupted,
passivateProcess,
processPassive,
reactivateProcess,
cancelProcessWithId,
cancelProcess,
processCancelled,
processCancelling,
whenCancellingProcess,
processAwait,
processYield,
timeoutProcess,
timeoutProcessUsingId,
processParallel,
processParallelUsingIds,
processParallel_,
processParallelUsingIds_,
catchProcess,
finallyProcess,
throwProcess,
zipProcessParallel,
zip3ProcessParallel,
unzipProcess,
memoProcess,
neverProcess) where
import Data.Maybe
import Data.IORef
import Control.Exception (IOException, throw)
import Control.Monad
import Control.Monad.Trans
import Control.Applicative
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Parameter
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Cont
import Simulation.Aivika.Internal.Signal
data ProcessId =
ProcessId { processStarted :: IORef Bool,
processReactCont :: IORef (Maybe (ContParams ())),
processCancelSource :: ContCancellationSource,
processInterruptRef :: IORef Bool,
processInterruptCont :: IORef (Maybe (ContParams ())),
processInterruptVersion :: IORef Int }
newtype Process a = Process (ProcessId -> Cont a)
class ProcessLift m where
liftProcess :: Process a -> m a
instance ProcessLift Process where
liftProcess = id
invokeProcess :: ProcessId -> Process a -> Cont a
invokeProcess pid (Process m) = m pid
holdProcess :: Double -> Process ()
holdProcess dt =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let x = processInterruptCont pid
writeIORef x $ Just c
writeIORef (processInterruptRef pid) False
v <- readIORef (processInterruptVersion pid)
invokeEvent p $
enqueueEvent (pointTime p + dt) $
Event $ \p ->
do v' <- readIORef (processInterruptVersion pid)
when (v == v') $
do writeIORef x Nothing
invokeEvent p $ resumeCont c ()
interruptProcess :: ProcessId -> Event ()
interruptProcess pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- readIORef x
case a of
Nothing -> return ()
Just c ->
do writeIORef x Nothing
writeIORef (processInterruptRef pid) True
modifyIORef (processInterruptVersion pid) $ (+) 1
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
processInterrupted :: ProcessId -> Event Bool
processInterrupted pid =
Event $ \p ->
readIORef (processInterruptRef pid)
passivateProcess :: Process ()
passivateProcess =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing -> writeIORef x $ Just c
Just _ -> error "Cannot passivate the process twice: passivateProcess"
processPassive :: ProcessId -> Event Bool
processPassive pid =
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
return $ isJust a
reactivateProcess :: ProcessId -> Event ()
reactivateProcess pid =
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing ->
return ()
Just c ->
do writeIORef x Nothing
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
processIdPrepare :: ProcessId -> Event ()
processIdPrepare pid =
Event $ \p ->
do y <- readIORef (processStarted pid)
if y
then error $
"Another process with the specified identifier " ++
"has been started already: processIdPrepare"
else writeIORef (processStarted pid) True
let signal = processCancelling pid
invokeEvent p $
handleSignal_ signal $ \_ ->
do interruptProcess pid
reactivateProcess pid
runProcess :: Process () -> Event ()
runProcess p =
do pid <- liftSimulation newProcessId
runProcessUsingId pid p
runProcessUsingId :: ProcessId -> Process () -> Event ()
runProcessUsingId pid p =
do processIdPrepare pid
runCont m cont econt ccont (processCancelSource pid) False
where cont = return
econt = throwEvent
ccont = return
m = invokeProcess pid p
runProcessInStartTime :: Process () -> Simulation ()
runProcessInStartTime = runEventInStartTime . runProcess
runProcessInStartTimeUsingId :: ProcessId -> Process () -> Simulation ()
runProcessInStartTimeUsingId pid p =
runEventInStartTime $ runProcessUsingId pid p
runProcessInStopTime :: Process () -> Simulation ()
runProcessInStopTime = runEventInStopTime . runProcess
runProcessInStopTimeUsingId :: ProcessId -> Process () -> Simulation ()
runProcessInStopTimeUsingId pid p =
runEventInStopTime $ runProcessUsingId pid p
enqueueProcess :: Double -> Process () -> Event ()
enqueueProcess t p =
enqueueEvent t $ runProcess p
enqueueProcessUsingId :: Double -> ProcessId -> Process () -> Event ()
enqueueProcessUsingId t pid p =
enqueueEvent t $ runProcessUsingId pid p
processId :: Process ProcessId
processId = Process return
newProcessId :: Simulation ProcessId
newProcessId =
do x <- liftIO $ newIORef Nothing
y <- liftIO $ newIORef False
c <- newContCancellationSource
i <- liftIO $ newIORef False
z <- liftIO $ newIORef Nothing
v <- liftIO $ newIORef 0
return ProcessId { processStarted = y,
processReactCont = x,
processCancelSource = c,
processInterruptRef = i,
processInterruptCont = z,
processInterruptVersion = v }
cancelProcessWithId :: ProcessId -> Event ()
cancelProcessWithId pid = contCancellationInitiate (processCancelSource pid)
cancelProcess :: Process a
cancelProcess =
do pid <- processId
liftEvent $ cancelProcessWithId pid
throwProcess $ error "The process must be cancelled already: cancelProcess."
processCancelled :: ProcessId -> Event Bool
processCancelled pid = contCancellationInitiated (processCancelSource pid)
processCancelling :: ProcessId -> Signal ()
processCancelling pid = contCancellationInitiating (processCancelSource pid)
whenCancellingProcess :: Event () -> Process ()
whenCancellingProcess h =
Process $ \pid ->
liftEvent $
handleSignal_ (processCancelling pid) $ \() -> h
instance Eq ProcessId where
x == y = processReactCont x == processReactCont y
instance Monad Process where
return = returnP
m >>= k = bindP m k
instance Functor Process where
fmap = liftM
instance Applicative Process where
pure = return
(<*>) = ap
instance ParameterLift Process where
liftParameter = liftPP
instance SimulationLift Process where
liftSimulation = liftSP
instance DynamicsLift Process where
liftDynamics = liftDP
instance EventLift Process where
liftEvent = liftEP
instance MonadIO Process where
liftIO = liftIOP
returnP :: a -> Process a
returnP a = Process $ \pid -> return a
bindP :: Process a -> (a -> Process b) -> Process b
bindP (Process m) k =
Process $ \pid ->
do a <- m pid
let Process m' = k a
m' pid
liftPP :: Parameter a -> Process a
liftPP m = Process $ \pid -> liftParameter m
liftSP :: Simulation a -> Process a
liftSP m = Process $ \pid -> liftSimulation m
liftDP :: Dynamics a -> Process a
liftDP m = Process $ \pid -> liftDynamics m
liftEP :: Event a -> Process a
liftEP m = Process $ \pid -> liftEvent m
liftIOP :: IO a -> Process a
liftIOP m = Process $ \pid -> liftIO m
catchProcess :: Process a -> (IOException -> Process a) -> Process a
catchProcess (Process m) h =
Process $ \pid ->
catchCont (m pid) $ \e ->
let Process m' = h e in m' pid
finallyProcess :: Process a -> Process b -> Process a
finallyProcess (Process m) (Process m') =
Process $ \pid ->
finallyCont (m pid) (m' pid)
throwProcess :: IOException -> Process a
throwProcess = liftIO . throw
processParallel :: [Process a] -> Process [a]
processParallel xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds
processParallelUsingIds :: [(ProcessId, Process a)] -> Process [a]
processParallelUsingIds xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processCancelSource pid)
processParallel_ :: [Process a] -> Process ()
processParallel_ xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds_
processParallelUsingIds_ :: [(ProcessId, Process a)] -> Process ()
processParallelUsingIds_ xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel_ $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processCancelSource pid)
processParallelCreateIds :: [Process a] -> Simulation [(ProcessId, Process a)]
processParallelCreateIds xs =
do pids <- liftSimulation $ forM xs $ const newProcessId
return $ zip pids xs
processParallelPrepare :: [(ProcessId, Process a)] -> Event ()
processParallelPrepare xs =
Event $ \p ->
forM_ xs $ invokeEvent p . processIdPrepare . fst
processUsingId :: ProcessId -> Process a -> Process a
processUsingId pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
rerunCont (invokeProcess pid x) (processCancelSource pid)
spawnProcess :: ContCancellation -> Process () -> Process ()
spawnProcess cancellation x =
do pid <- liftSimulation newProcessId
spawnProcessUsingId cancellation pid x
spawnProcessUsingId :: ContCancellation -> ProcessId -> Process () -> Process ()
spawnProcessUsingId cancellation pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
spawnCont cancellation (invokeProcess pid x) (processCancelSource pid)
processAwait :: Signal a -> Process a
processAwait signal =
Process $ \pid -> contAwait signal
data MemoResult a = MemoComputed a
| MemoError IOException
| MemoCancelled
memoProcess :: Process a -> Simulation (Process a)
memoProcess x =
do started <- liftIO $ newIORef False
computed <- newSignalSource
value <- liftIO $ newIORef Nothing
let result =
do Just x <- liftIO $ readIORef value
case x of
MemoComputed a -> return a
MemoError e -> throwProcess e
MemoCancelled -> cancelProcess
return $
do v <- liftIO $ readIORef value
case v of
Just _ -> result
Nothing ->
do f <- liftIO $ readIORef started
case f of
True ->
do processAwait $ publishSignal computed
result
False ->
do liftIO $ writeIORef started True
r <- liftIO $ newIORef MemoCancelled
finallyProcess
(catchProcess
(do a <- x
liftIO $ writeIORef r (MemoComputed a))
(\e ->
liftIO $ writeIORef r (MemoError e)))
(liftEvent $
do liftIO $
do x <- readIORef r
writeIORef value (Just x)
triggerSignal computed ())
result
zipProcessParallel :: Process a -> Process b -> Process (a, b)
zipProcessParallel x y =
do [Left a, Right b] <- processParallel [fmap Left x, fmap Right y]
return (a, b)
zip3ProcessParallel :: Process a -> Process b -> Process c -> Process (a, b, c)
zip3ProcessParallel x y z =
do [Left a,
Right (Left b),
Right (Right c)] <-
processParallel [fmap Left x,
fmap (Right . Left) y,
fmap (Right . Right) z]
return (a, b, c)
unzipProcess :: Process (a, b) -> Simulation (Process a, Process b)
unzipProcess xy =
do xy' <- memoProcess xy
return (fmap fst xy', fmap snd xy')
timeoutProcess :: Double -> Process a -> Process (Maybe a)
timeoutProcess timeout p =
do pid <- liftSimulation newProcessId
timeoutProcessUsingId timeout pid p
timeoutProcessUsingId :: Double -> ProcessId -> Process a -> Process (Maybe a)
timeoutProcessUsingId timeout pid p =
do s <- liftSimulation newSignalSource
timeoutPid <- liftSimulation newProcessId
spawnProcessUsingId CancelChildAfterParent timeoutPid $
finallyProcess
(holdProcess timeout)
(liftEvent $
cancelProcessWithId pid)
spawnProcessUsingId CancelChildAfterParent pid $
do r <- liftIO $ newIORef Nothing
finallyProcess
(catchProcess
(do a <- p
liftIO $ writeIORef r $ Just (Right a))
(\e ->
liftIO $ writeIORef r $ Just (Left e)))
(liftEvent $
do x <- liftIO $ readIORef r
triggerSignal s x)
x <- processAwait $ publishSignal s
case x of
Nothing -> return Nothing
Just (Right a) -> return (Just a)
Just (Left e) -> throwProcess e
processYield :: Process ()
processYield =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
invokeEvent p $
enqueueEvent (pointTime p) $
resumeCont c ()
neverProcess :: Process a
neverProcess =
Process $ \pid ->
Cont $ \c ->
let signal = processCancelling pid
in handleSignal_ signal $ \_ ->
resumeCont c $ error "It must never be computed: neverProcess"