module Simulation.Aivika.Distributed.Optimistic.Internal.Event
(queueInputMessages,
queueOutputMessages,
queueLog,
syncEvent) where
import Data.Maybe
import Data.IORef
import Data.Typeable
import Data.Time.Clock
import System.Timeout
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import qualified Control.Distributed.Process as DP
import qualified Simulation.Aivika.PriorityQueue.Pure as PQ
import Simulation.Aivika.Trans
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeWarp
import Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue
import Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue
import Simulation.Aivika.Distributed.Optimistic.Internal.UndoableLog
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.Ref as R
microsecondsToSeconds :: Int -> Rational
microsecondsToSeconds x = (fromInteger $ toInteger x) / 1000000
instance EventQueueing DIO where
data EventQueue DIO =
EventQueue { queueInputMessages :: InputMessageQueue,
queueOutputMessages :: OutputMessageQueue,
queueLog :: UndoableLog,
queuePQ :: R.Ref (PQ.PriorityQueue (Point DIO -> DIO ())),
queueBusy :: IORef Bool,
queueTime :: IORef Double,
queueGlobalTime :: IORef Double,
queueLocalTime :: IORef Double,
queueLocalTime0 :: IORef Double,
queueLocalTimeTimestamp :: IORef UTCTime,
queueLocalTimeInterval :: NominalDiffTime
}
newEventQueue specs =
do f <- liftIOUnsafe $ newIORef False
t <- liftIOUnsafe $ newIORef $ spcStartTime specs
gt <- liftIOUnsafe $ newIORef $ spcStartTime specs
pq <- R.newRef0 PQ.emptyQueue
log <- newUndoableLog
output <- newOutputMessageQueue
input <- newInputMessageQueue log rollbackEventPre rollbackEventPost rollbackEventTime
loct <- liftIOUnsafe $ newIORef $ spcStartTime specs
loct0 <- liftIOUnsafe $ newIORef $ spcStartTime specs
loctstamp <- liftIOUnsafe $ getCurrentTime >>= newIORef
locdt <- fmap (fromRational . microsecondsToSeconds . dioSyncTimeout) dioParams
return EventQueue { queueInputMessages = input,
queueOutputMessages = output,
queueLog = log,
queuePQ = pq,
queueBusy = f,
queueTime = t,
queueGlobalTime = gt,
queueLocalTime = loct,
queueLocalTime0 = loct0,
queueLocalTimeTimestamp = loctstamp,
queueLocalTimeInterval = locdt }
enqueueEvent t (Event m) =
Event $ \p ->
let pq = queuePQ $ runEventQueue $ pointRun p
in invokeEvent p $
R.modifyRef pq $ \x -> PQ.enqueue x t m
runEventWith processing (Event e) =
Dynamics $ \p ->
do p0 <- invokeEvent p currentEventPoint
invokeEvent p0 $ enqueueEvent (pointTime p) (return ())
invokeEvent p $ syncEvents processing
e p
eventQueueCount =
Event $ \p ->
let pq = queuePQ $ runEventQueue $ pointRun p
in invokeEvent p $
fmap PQ.queueCount $ R.readRef pq
rollbackEventPre :: Bool -> TimeWarp DIO ()
rollbackEventPre including =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
rollbackLog (queueLog q) (pointTime p) including
rollbackEventPost :: Bool -> TimeWarp DIO ()
rollbackEventPost including =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
rollbackOutputMessages (queueOutputMessages q) (pointTime p) including
rollbackEventTime :: TimeWarp DIO ()
rollbackEventTime =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
liftIOUnsafe $
do writeIORef (queueTime q) t
modifyIORef' (queueLocalTime q) (min t)
modifyIORef' (queueLocalTime0 q) (min t)
t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q)
when (t0 > t) $
do
liftIOUnsafe $ writeIORef (queueGlobalTime q) t
invokeEvent p sendLocalTime
currentEventTime :: Event DIO Double
currentEventTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $ readIORef (queueTime q)
currentEventPoint :: Event DIO (Point DIO)
currentEventPoint =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueTime q)
if t' == pointTime p
then return p
else let sc = pointSpecs p
t0 = spcStartTime sc
dt = spcDT sc
n' = fromIntegral $ floor ((t' t0) / dt)
in return p { pointTime = t',
pointIteration = n',
pointPhase = 1 }
processPendingEventsCore :: Bool -> Dynamics DIO ()
processPendingEventsCore includingCurrentEvents = Dynamics r where
r p =
do let q = runEventQueue $ pointRun p
f = queueBusy q
f' <- liftIOUnsafe $ readIORef f
if f'
then error $
"Detected an event loop, which may indicate to " ++
"a logical error in the model: processPendingEventsCore"
else do liftIOUnsafe $ writeIORef f True
call q p p
liftIOUnsafe $ writeIORef f False
call q p p0 =
do let pq = queuePQ q
r = pointRun p
p1 <- invokeEvent p0 currentEventPoint
ok <- invokeEvent p1 $ runTimeWarp processChannelMessages
if not ok
then call q p p1
else do
f <- invokeEvent p1 $ fmap PQ.queueNull $ R.readRef pq
unless f $
do (t2, c2) <- invokeEvent p1 $ fmap PQ.queueFront $ R.readRef pq
let t = queueTime q
t' <- liftIOUnsafe $ readIORef t
when (t2 < t') $
error $
"The time value is too small (" ++ show t2 ++
" < " ++ show t' ++ "): processPendingEventsCore"
when ((t2 < pointTime p) ||
(includingCurrentEvents && (t2 == pointTime p))) $
do let sc = pointSpecs p
t0 = spcStartTime sc
dt = spcDT sc
n2 = fromIntegral $ floor ((t2 t0) / dt)
p2 = p { pointTime = t2,
pointIteration = n2,
pointPhase = 1 }
liftIOUnsafe $ writeIORef t t2
invokeEvent p2 $ R.modifyRef pq PQ.dequeue
catchComp
(c2 p2)
(\e@(SimulationRetry _) -> invokeEvent p2 $ handleEventRetry e)
call q p p2
processPendingEvents :: Bool -> Dynamics DIO ()
processPendingEvents includingCurrentEvents = Dynamics r where
r p =
do let q = runEventQueue $ pointRun p
t = queueTime q
t' <- liftIOUnsafe $ readIORef t
if pointTime p < t'
then error $
"The current time is less than " ++
"the time in the queue: processPendingEvents"
else invokeDynamics p m
m = processPendingEventsCore includingCurrentEvents
processEventsIncludingCurrent :: Dynamics DIO ()
processEventsIncludingCurrent = processPendingEvents True
processEventsIncludingEarlier :: Dynamics DIO ()
processEventsIncludingEarlier = processPendingEvents False
processEventsIncludingCurrentCore :: Dynamics DIO ()
processEventsIncludingCurrentCore = processPendingEventsCore True
processEventsIncludingEarlierCore :: Dynamics DIO ()
processEventsIncludingEarlierCore = processPendingEventsCore True
processEvents :: EventProcessing -> Dynamics DIO ()
processEvents CurrentEvents = processEventsIncludingCurrent
processEvents EarlierEvents = processEventsIncludingEarlier
processEvents CurrentEventsOrFromPast = processEventsIncludingCurrentCore
processEvents EarlierEventsOrFromPast = processEventsIncludingEarlierCore
isEventOverflow :: Event DIO Bool
isEventOverflow =
Event $ \p ->
do let q = runEventQueue $ pointRun p
n1 <- liftIOUnsafe $ logSize (queueLog q)
n2 <- liftIOUnsafe $ outputMessageQueueSize (queueOutputMessages q)
ps <- dioParams
let th1 = dioUndoableLogSizeThreshold ps
th2 = dioOutputMessageQueueSizeThreshold ps
if (n1 >= th1) || (n2 >= th2)
then do logDIO NOTICE $
"t = " ++ (show $ pointTime p) ++
": detected the event overflow"
return True
else return False
throttleMessageChannel :: TimeWarp DIO ()
throttleMessageChannel =
TimeWarp $ \p ->
do invokeEvent p updateLocalTime
invokeEvent p sendLocalTime
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
liftIOUnsafe $
timeout dt $ awaitChannel ch
invokeTimeWarp p $ processChannelMessages
processChannelMessages :: TimeWarp DIO ()
processChannelMessages =
TimeWarp $ \p ->
do ch <- messageChannel
f <- liftIOUnsafe $ channelEmpty ch
unless f $
do xs <- liftIOUnsafe $ readChannel ch
forM_ xs $ \x ->
do p' <- invokeEvent p currentEventPoint
invokeTimeWarp p' $ processChannelMessage x
p' <- invokeEvent p currentEventPoint
f2 <- invokeEvent p' isEventOverflow
when f2 $
invokeTimeWarp p' throttleMessageChannel
processChannelMessage :: LocalProcessMessage -> TimeWarp DIO ()
processChannelMessage x@(QueueMessage m) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q)
when (messageReceiveTime m < t0) $
do f <- fmap dioAllowProcessingOutdatedMessage dioParams
if f
then invokeEvent p logOutdatedMessage
else error "Received the outdated message: processChannelMessage"
invokeTimeWarp p $
enqueueMessage (queueInputMessages q) m
processChannelMessage x@(QueueMessageBulk ms) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q)
forM_ ms $ \m ->
do when (messageReceiveTime m < t0) $
do f <- fmap dioAllowProcessingOutdatedMessage dioParams
if f
then invokeEvent p logOutdatedMessage
else error "Received the outdated message: processChannelMessage"
invokeTimeWarp p $
enqueueMessage (queueInputMessages q) m
processChannelMessage x@(GlobalTimeMessage globalTime) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
case globalTime of
Nothing -> return ()
Just t0 ->
invokeEvent p $
updateGlobalTime t0
invokeEvent p updateLocalTime
t <- invokeEvent p getLocalTime
sender <- messageInboxId
receiver <- timeServerId
liftDistributedUnsafe $
DP.send receiver (GlobalTimeMessageResp sender t)
processChannelMessage x@(LocalTimeMessageResp globalTime) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
invokeEvent p $
updateGlobalTime globalTime
processChannelMessage x@TerminateLocalProcessMessage =
TimeWarp $ \p ->
do
liftDistributedUnsafe $
DP.terminate
updateGlobalTime :: Double -> Event DIO ()
updateGlobalTime t =
Event $ \p ->
do let q = runEventQueue $ pointRun p
invokeEvent p updateLocalTime
t' <- invokeEvent p getLocalTime
if t > t'
then logDIO WARNING $
"t = " ++ show t' ++
": Ignored the global time that is greater than the current local time"
else do liftIOUnsafe $
writeIORef (queueGlobalTime q) t
invokeEvent p $
reduceEvents t
showMessage :: Message -> ShowS
showMessage m =
showString "{ " .
showString "sendTime = " .
shows (messageSendTime m) .
showString ", receiveTime = " .
shows (messageReceiveTime m) .
(if messageAntiToggle m
then showString ", antiToggle = True"
else showString "") .
showString " }"
logMessage :: LocalProcessMessage -> Event DIO ()
logMessage (QueueMessage m) =
Event $ \p ->
logDIO INFO $
"t = " ++ (show $ pointTime p) ++
": QueueMessage " ++
showMessage m []
logMessage (QueueMessageBulk ms) =
Event $ \p ->
logDIO INFO $
"t = " ++ (show $ pointTime p) ++
": QueueMessageBulk [ " ++
let fs = foldl1 (\a b -> a . showString ", " . b) $ map showMessage ms
in fs [] ++ " ]"
logMessage m =
Event $ \p ->
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
": " ++ show m
logSyncLocalTime :: Event DIO ()
logSyncLocalTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
", global t = " ++ (show t') ++
": synchronizing the local time..."
logSyncLocalTime0 :: Event DIO ()
logSyncLocalTime0 =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
", global t = " ++ (show t') ++
": synchronizing the local time in ring 0..."
logSendLocalTime :: Event DIO ()
logSendLocalTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
", global t = " ++ (show t') ++
": sending the local time to the time server after delay..."
logPrematureIO :: Event DIO ()
logPrematureIO =
Event $ \p ->
logDIO ERROR $
"t = " ++ (show $ pointTime p) ++
": detected a premature IO action"
logOutdatedMessage :: Event DIO ()
logOutdatedMessage =
Event $ \p ->
logDIO ERROR $
"t = " ++ (show $ pointTime p) ++
": received the outdated message"
reduceEvents :: Double -> Event DIO ()
reduceEvents t =
Event $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $
do reduceInputMessages (queueInputMessages q) t
reduceOutputMessages (queueOutputMessages q) t
reduceLog (queueLog q) t
instance MonadIO (Event DIO) where
liftIO m =
Event $ \p ->
do ok <- invokeEvent p $
runTimeWarp $
syncLocalTime $
return ()
if ok
then liftIOUnsafe m
else do f <- fmap dioAllowPrematureIO dioParams
if f
then do
liftIOUnsafe m
else error $
"Detected a premature IO action at t = " ++
(show $ pointTime p) ++ ": liftIO"
updateLocalTime :: Event DIO Bool
updateLocalTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
timestamp0 <- liftIOUnsafe $ readIORef (queueLocalTimeTimestamp q)
timestamp <- liftIOUnsafe getCurrentTime
let dt = queueLocalTimeInterval q
if timestamp >= addUTCTime dt timestamp0
then do
liftIOUnsafe $
do t <- readIORef (queueTime q)
loct0 <- readIORef (queueLocalTime0 q)
writeIORef (queueLocalTime q) (min t loct0)
writeIORef (queueLocalTime0 q) t
writeIORef (queueLocalTimeTimestamp q) timestamp
return (t /= loct0)
else return False
getLocalTime :: Event DIO Double
getLocalTime =
Event $ \p ->
let q = runEventQueue $ pointRun p
in liftIOUnsafe $ readIORef (queueLocalTime q)
sendLocalTime :: Event DIO ()
sendLocalTime =
Event $ \p ->
do
invokeEvent p updateLocalTime
t <- invokeEvent p getLocalTime
sender <- messageInboxId
receiver <- timeServerId
liftDistributedUnsafe $
DP.send receiver (LocalTimeMessage sender t)
syncLocalTime :: Dynamics DIO () -> TimeWarp DIO ()
syncLocalTime m =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
invokeDynamics p m
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
if t' > t
then error "Inconsistent time: syncLocalTime"
else if (t == spcStartTime (pointSpecs p)) || (t' == pointTime p)
then return ()
else do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
if ok
then do case f of
Just _ ->
invokeTimeWarp p $ syncLocalTime m
Nothing ->
do f <- invokeEvent p updateLocalTime
invokeEvent p sendLocalTime
if f
then invokeTimeWarp p $ syncLocalTime m
else invokeTimeWarp p $ syncLocalTime0 m
else return ()
syncLocalTime0 :: Dynamics DIO () -> TimeWarp DIO ()
syncLocalTime0 m =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
invokeDynamics p m
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
if t' > t
then error "Inconsistent time: syncLocalTime0"
else if t' == pointTime p
then return ()
else do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
if ok
then do case f of
Just _ ->
invokeTimeWarp p $ syncLocalTime m
Nothing ->
error "Detected a deadlock when synchronizing the local time: syncLocalTime0"
else return ()
runTimeWarp :: TimeWarp DIO () -> Event DIO Bool
runTimeWarp m =
Event $ \p ->
do let q = runEventQueue $ pointRun p
v0 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q)
invokeTimeWarp p m
v2 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q)
return (v0 == v2)
syncEvents :: EventProcessing -> Event DIO ()
syncEvents processing =
Event $ \p ->
do ok <- invokeEvent p $
runTimeWarp $
syncLocalTime $
processEvents processing
unless ok $
invokeEvent p $
syncEvents processing
syncEvent :: Double -> Event DIO () -> Event DIO ()
syncEvent t h =
enqueueEvent t $
Event $ \p ->
do ok <- invokeEvent p $
runTimeWarp $
syncLocalTime $
return ()
when ok $
invokeEvent p h
handleEventRetry :: SimulationRetry -> Event DIO ()
handleEventRetry e =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
logDIO NOTICE $
"t = " ++ show t ++
": retrying the computations..."
invokeTimeWarp p $
retryInputMessages (queueInputMessages q)
let loop =
do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
when ok $
case f of
Just _ -> loop
Nothing -> loop0
loop0 =
do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
when ok $
case f of
Just _ -> loop
Nothing ->
error $
"Detected a deadlock when retrying the computations: handleEventRetry\n" ++
"--- the nested exception ---\n" ++ show e
loop