module Simulation.Aivika.Distributed.Optimistic.Internal.Event
(queueInputMessages,
queueOutputMessages,
queueLog,
expectEvent,
processMonitorSignal) where
import Data.Maybe
import Data.IORef
import Data.Time.Clock
import System.Timeout
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import qualified Control.Distributed.Process as DP
import qualified Simulation.Aivika.PriorityQueue.Pure as PQ
import Simulation.Aivika.Trans
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Trans.Internal.Event
import Simulation.Aivika.Trans.Internal.Cont
import Simulation.Aivika.Trans.Internal.Process
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeWarp
import Simulation.Aivika.Distributed.Optimistic.Internal.SignalHelper
import Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue
import Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue
import Simulation.Aivika.Distributed.Optimistic.Internal.TransientMessageQueue
import Simulation.Aivika.Distributed.Optimistic.Internal.UndoableLog
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.Ref.Strict as R
microsecondsToSeconds :: Int -> Rational
microsecondsToSeconds x = (fromInteger $ toInteger x) / 1000000
instance EventQueueing DIO where
data EventQueue DIO =
EventQueue { queueInputMessages :: InputMessageQueue,
queueOutputMessages :: OutputMessageQueue,
queueTransientMessages :: TransientMessageQueue,
queueLog :: UndoableLog,
queuePQ :: R.Ref (PQ.PriorityQueue (Point DIO -> DIO ())),
queueBusy :: IORef Bool,
queueTime :: IORef Double,
queueGlobalTime :: IORef Double,
queueInFind :: IORef Bool,
queueProcessMonitorNotificationSource :: SignalSource DIO DP.ProcessMonitorNotification
}
newEventQueue specs =
do f <- liftIOUnsafe $ newIORef False
t <- liftIOUnsafe $ newIORef $ spcStartTime specs
gt <- liftIOUnsafe $ newIORef $ spcStartTime specs
pq <- R.newRef0 PQ.emptyQueue
log <- newUndoableLog
transient <- newTransientMessageQueue
output <- newOutputMessageQueue $ enqueueTransientMessage transient
input <- newInputMessageQueue log rollbackEventPre rollbackEventPost rollbackEventTime
infind <- liftIOUnsafe $ newIORef False
s <- newDIOSignalSource0
return EventQueue { queueInputMessages = input,
queueOutputMessages = output,
queueTransientMessages = transient,
queueLog = log,
queuePQ = pq,
queueBusy = f,
queueTime = t,
queueGlobalTime = gt,
queueInFind = infind,
queueProcessMonitorNotificationSource = s }
enqueueEvent t (Event m) =
Event $ \p ->
let pq = queuePQ $ runEventQueue $ pointRun p
in invokeEvent p $
R.modifyRef pq $ \x -> PQ.enqueue x t m
runEventWith processing (Event e) =
Dynamics $ \p ->
do p0 <- invokeEvent p currentEventPoint
invokeEvent p0 $ enqueueEvent (pointTime p) (return ())
invokeEvent p $ syncEvents processing
e p
eventQueueCount =
Event $ \p ->
let pq = queuePQ $ runEventQueue $ pointRun p
in invokeEvent p $
fmap PQ.queueCount $ R.readRef pq
rollbackEventPre :: Bool -> TimeWarp DIO ()
rollbackEventPre including =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
rollbackLog (queueLog q) (pointTime p) including
rollbackEventPost :: Bool -> TimeWarp DIO ()
rollbackEventPost including =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
rollbackOutputMessages (queueOutputMessages q) (pointTime p) including
rollbackEventTime :: TimeWarp DIO ()
rollbackEventTime =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
liftIOUnsafe $ writeIORef (queueTime q) t
t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q)
when (t0 > t) $
do
liftIOUnsafe $ writeIORef (queueGlobalTime q) t
currentEventTime :: Event DIO Double
currentEventTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $ readIORef (queueTime q)
currentEventPoint :: Event DIO (Point DIO)
currentEventPoint =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueTime q)
if t' == pointTime p
then return p
else let sc = pointSpecs p
t0 = spcStartTime sc
dt = spcDT sc
n' = fromIntegral $ floor ((t' t0) / dt)
in return p { pointTime = t',
pointIteration = n',
pointPhase = 1 }
processPendingEventsCore :: Bool -> Dynamics DIO ()
processPendingEventsCore includingCurrentEvents = Dynamics r where
r p =
do let q = runEventQueue $ pointRun p
f = queueBusy q
f' <- liftIOUnsafe $ readIORef f
if f'
then error $
"Detected an event loop, which may indicate to " ++
"a logical error in the model: processPendingEventsCore"
else do liftIOUnsafe $ writeIORef f True
call q p p
liftIOUnsafe $ writeIORef f False
call q p p0 =
do let pq = queuePQ q
r = pointRun p
p1 <- invokeEvent p0 currentEventPoint
ok <- invokeEvent p1 $ runTimeWarp processChannelMessages
if not ok
then call q p p1
else do
f <- invokeEvent p1 $ fmap PQ.queueNull $ R.readRef pq
unless f $
do (t2, c2) <- invokeEvent p1 $ fmap PQ.queueFront $ R.readRef pq
let t = queueTime q
t' <- liftIOUnsafe $ readIORef t
when (t2 < t') $
error $
"The time value is too small (" ++ show t2 ++
" < " ++ show t' ++ "): processPendingEventsCore"
when ((t2 < pointTime p) ||
(includingCurrentEvents && (t2 == pointTime p))) $
do let sc = pointSpecs p
t0 = spcStartTime sc
dt = spcDT sc
n2 = fromIntegral $ floor ((t2 t0) / dt)
p2 = p { pointTime = t2,
pointIteration = n2,
pointPhase = 1 }
liftIOUnsafe $ writeIORef t t2
invokeEvent p2 $ R.modifyRef pq PQ.dequeue
catchComp
(c2 p2)
(\e@(SimulationRetry _) -> invokeEvent p2 $ handleEventRetry e)
call q p p2
processPendingEvents :: Bool -> Dynamics DIO ()
processPendingEvents includingCurrentEvents = Dynamics r where
r p =
do let q = runEventQueue $ pointRun p
t = queueTime q
t' <- liftIOUnsafe $ readIORef t
if pointTime p < t'
then error $
"The current time is less than " ++
"the time in the queue: processPendingEvents"
else invokeDynamics p m
m = processPendingEventsCore includingCurrentEvents
processEventsIncludingCurrent :: Dynamics DIO ()
processEventsIncludingCurrent = processPendingEvents True
processEventsIncludingEarlier :: Dynamics DIO ()
processEventsIncludingEarlier = processPendingEvents False
processEventsIncludingCurrentCore :: Dynamics DIO ()
processEventsIncludingCurrentCore = processPendingEventsCore True
processEventsIncludingEarlierCore :: Dynamics DIO ()
processEventsIncludingEarlierCore = processPendingEventsCore True
processEvents :: EventProcessing -> Dynamics DIO ()
processEvents CurrentEvents = processEventsIncludingCurrent
processEvents EarlierEvents = processEventsIncludingEarlier
processEvents CurrentEventsOrFromPast = processEventsIncludingCurrentCore
processEvents EarlierEventsOrFromPast = processEventsIncludingEarlierCore
isEventOverflow :: Event DIO Bool
isEventOverflow =
Event $ \p ->
do let q = runEventQueue $ pointRun p
n1 <- liftIOUnsafe $ logSize (queueLog q)
n2 <- liftIOUnsafe $ outputMessageQueueSize (queueOutputMessages q)
n3 <- liftIOUnsafe $ transientMessageQueueSize (queueTransientMessages q)
ps <- dioParams
let th1 = dioUndoableLogSizeThreshold ps
th2 = dioOutputMessageQueueSizeThreshold ps
th3 = dioTransientMessageQueueSizeThreshold ps
if (n1 >= th1) || (n2 >= th2) || (n3 >= th3)
then do logDIO NOTICE $
"t = " ++ (show $ pointTime p) ++
": detected the event overflow"
return True
else return False
throttleMessageChannel :: TimeWarp DIO ()
throttleMessageChannel =
TimeWarp $ \p ->
do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
liftIOUnsafe $
timeout dt $ awaitChannel ch
invokeTimeWarp p $ processChannelMessages
processChannelMessages :: TimeWarp DIO ()
processChannelMessages =
TimeWarp $ \p ->
do ch <- messageChannel
f <- liftIOUnsafe $ channelEmpty ch
unless f $
do xs <- liftIOUnsafe $ readChannel ch
forM_ xs $ \x ->
do p' <- invokeEvent p currentEventPoint
invokeTimeWarp p' $ processChannelMessage x
p' <- invokeEvent p currentEventPoint
f2 <- invokeEvent p' isEventOverflow
when f2 $
invokeTimeWarp p' throttleMessageChannel
processChannelMessage :: LogicalProcessMessage -> TimeWarp DIO ()
processChannelMessage x@(QueueMessage m) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
infind <- liftIOUnsafe $ readIORef (queueInFind q)
deliverAcknowledgementMessage (acknowledgementMessage infind m)
t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q)
p' <- invokeEvent p currentEventPoint
if messageReceiveTime m < t0
then do f <- fmap dioAllowSkippingOutdatedMessage dioParams
if f
then invokeEvent p' logOutdatedMessage
else error "Received the outdated message: processChannelMessage"
else invokeTimeWarp p' $
enqueueMessage (queueInputMessages q) m
processChannelMessage x@(QueueMessageBulk ms) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
infind <- liftIOUnsafe $ readIORef (queueInFind q)
deliverAcknowledgementMessages $ map (acknowledgementMessage infind) ms
t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q)
forM_ ms $ \m ->
do p' <- invokeEvent p currentEventPoint
if messageReceiveTime m < t0
then do f <- fmap dioAllowSkippingOutdatedMessage dioParams
if f
then invokeEvent p' logOutdatedMessage
else error "Received the outdated message: processChannelMessage"
else invokeTimeWarp p' $
enqueueMessage (queueInputMessages q) m
processChannelMessage x@(AcknowledgementQueueMessage m) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $
processAcknowledgementMessage (queueTransientMessages q) m
processChannelMessage x@(AcknowledgementQueueMessageBulk ms) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $
forM_ ms $
processAcknowledgementMessage (queueTransientMessages q)
processChannelMessage x@ComputeLocalTimeMessage =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $
writeIORef (queueInFind q) True
t' <- invokeEvent p getLocalTime
sender <- messageInboxId
receiver <- timeServerId
sendLocalTimeDIO receiver sender t'
processChannelMessage x@(GlobalTimeMessage globalTime) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $
do writeIORef (queueInFind q) False
resetAcknowledgementMessageTime (queueTransientMessages q)
invokeEvent p $
updateGlobalTime globalTime
processChannelMessage x@(ProcessMonitorNotificationMessage y@(DP.ProcessMonitorNotification _ pid reason)) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
invokeEvent p $
triggerSignal (queueProcessMonitorNotificationSource q) y
processChannelMessage x@(ReconnectProcessMessage pid) =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
invokeEvent p $
reconnectProcess pid
getLocalTime :: Event DIO Double
getLocalTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t1 <- liftIOUnsafe $ readIORef (queueTime q)
t2 <- liftIOUnsafe $ transientMessageQueueTime (queueTransientMessages q)
t3 <- liftIOUnsafe $ acknowledgementMessageTime (queueTransientMessages q)
let t' = t1 `min` t2 `min` t3
return t'
updateGlobalTime :: Double -> Event DIO ()
updateGlobalTime t =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- invokeEvent p getLocalTime
if t > t'
then logDIO WARNING $
"t = " ++ show t' ++
": Ignored the global time that is greater than the current local time"
else do liftIOUnsafe $
writeIORef (queueGlobalTime q) t
invokeEvent p $
reduceEvents t
requestGlobalTime :: Event DIO ()
requestGlobalTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
sender <- messageInboxId
receiver <- timeServerId
sendRequestGlobalTimeDIO receiver sender
showMessage :: Message -> ShowS
showMessage m =
showString "{ " .
showString "sendTime = " .
shows (messageSendTime m) .
showString ", receiveTime = " .
shows (messageReceiveTime m) .
(if messageAntiToggle m
then showString ", antiToggle = True"
else showString "") .
showString " }"
logMessage :: LogicalProcessMessage -> Event DIO ()
logMessage (QueueMessage m) =
Event $ \p ->
logDIO INFO $
"t = " ++ (show $ pointTime p) ++
": QueueMessage " ++
showMessage m []
logMessage (QueueMessageBulk ms) =
Event $ \p ->
logDIO INFO $
"t = " ++ (show $ pointTime p) ++
": QueueMessageBulk [ " ++
let fs = foldl1 (\a b -> a . showString ", " . b) $ map showMessage ms
in fs [] ++ " ]"
logMessage m =
Event $ \p ->
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
": " ++ show m
logSyncLocalTime :: Event DIO ()
logSyncLocalTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
", global t = " ++ (show t') ++
": synchronizing the local time..."
logSyncLocalTime0 :: Event DIO ()
logSyncLocalTime0 =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
", global t = " ++ (show t') ++
": synchronizing the local time in ring 0..."
logRequestGlobalTime :: Event DIO ()
logRequestGlobalTime =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
logDIO DEBUG $
"t = " ++ (show $ pointTime p) ++
", global t = " ++ (show t') ++
": requesting for a new global time..."
logPrematureIO :: Event DIO ()
logPrematureIO =
Event $ \p ->
logDIO ERROR $
"t = " ++ (show $ pointTime p) ++
": detected a premature IO action"
logOutdatedMessage :: Event DIO ()
logOutdatedMessage =
Event $ \p ->
logDIO WARNING $
"t = " ++ (show $ pointTime p) ++
": skipping the outdated message"
reduceEvents :: Double -> Event DIO ()
reduceEvents t =
Event $ \p ->
do let q = runEventQueue $ pointRun p
liftIOUnsafe $
do reduceInputMessages (queueInputMessages q) t
reduceOutputMessages (queueOutputMessages q) t
reduceLog (queueLog q) t
instance MonadIO (Event DIO) where
liftIO m =
Event $ \p ->
do ok <- invokeEvent p $
runTimeWarp $
syncLocalTime $
return ()
if ok
then liftIOUnsafe m
else do f <- fmap dioAllowPrematureIO dioParams
if f
then do
liftIOUnsafe m
else error $
"Detected a premature IO action at t = " ++
(show $ pointTime p) ++ ": liftIO"
syncLocalTime :: Dynamics DIO () -> TimeWarp DIO ()
syncLocalTime m =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
invokeDynamics p m
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
if t' > t
then error "Inconsistent time: syncLocalTime"
else if (t == spcStartTime (pointSpecs p)) || (t' == pointTime p)
then return ()
else do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
if ok
then do case f of
Just _ ->
invokeTimeWarp p $ syncLocalTime m
Nothing ->
do
invokeTimeWarp p $ syncLocalTime0 m
else return ()
syncLocalTime0 :: Dynamics DIO () -> TimeWarp DIO ()
syncLocalTime0 m =
TimeWarp $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
invokeDynamics p m
t' <- liftIOUnsafe $ readIORef (queueGlobalTime q)
if t' > t
then error "Inconsistent time: syncLocalTime0"
else if t' == pointTime p
then return ()
else do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
if ok
then do case f of
Just _ ->
invokeTimeWarp p $ syncLocalTime m
Nothing ->
error "Detected a deadlock when synchronizing the local time: syncLocalTime0"
else return ()
runTimeWarp :: TimeWarp DIO () -> Event DIO Bool
runTimeWarp m =
Event $ \p ->
do let q = runEventQueue $ pointRun p
v0 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q)
invokeTimeWarp p m
v2 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q)
return (v0 == v2)
syncEvents :: EventProcessing -> Event DIO ()
syncEvents processing =
Event $ \p ->
do ok <- invokeEvent p $
runTimeWarp $
syncLocalTime $
processEvents processing
unless ok $
invokeEvent p $
syncEvents processing
instance EventIOQueueing DIO where
enqueueEventIO t h =
enqueueEvent t $
Event $ \p ->
do ok <- invokeEvent p $
runTimeWarp $
syncLocalTime $
return ()
when ok $
invokeEvent p h
handleEventRetry :: SimulationRetry -> Event DIO ()
handleEventRetry e =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
logDIO INFO $
"t = " ++ show t ++
": retrying the computations..."
invokeTimeWarp p $
retryInputMessages (queueInputMessages q)
let loop =
do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
when ok $
case f of
Just _ -> loop
Nothing -> loop0
loop0 =
do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
when ok $
case f of
Just _ -> loop
Nothing ->
error $
"Detected a deadlock when retrying the computations: handleEventRetry\n" ++
"--- the nested exception ---\n" ++ show e
loop
reconnectProcess :: DP.ProcessId -> Event DIO ()
reconnectProcess pid =
Event $ \p ->
do let q = runEventQueue $ pointRun p
logDIO NOTICE $
"t = " ++ show (pointTime p) ++
": reconnecting to " ++ show pid ++ "..."
infind <- liftIOUnsafe $ readIORef (queueInFind q)
let ys = queueInputMessages q
ys' <- liftIOUnsafe $
fmap (map $ acknowledgementMessage infind) $
filterInputMessages (\x -> messageSenderId x == pid) ys
unless (null ys') $
sendAcknowledgementMessagesDIO pid ys'
xs <- liftIOUnsafe $ transientMessages (queueTransientMessages q)
let xs' = filter (\x -> messageReceiverId x == pid) xs
unless (null xs') $
sendMessagesDIO pid xs'
processMonitorSignal :: Signal DIO DP.ProcessMonitorNotification
processMonitorSignal =
Signal { handleSignal = \h ->
Event $ \p ->
let q = runEventQueue (pointRun p)
s = publishSignal (queueProcessMonitorNotificationSource q)
in invokeEvent p $
handleSignal s h
}
expectEvent :: Event DIO (Maybe a) -> (a -> Event DIO ()) -> Event DIO ()
expectEvent m cont =
Event $ \p ->
do let q = runEventQueue $ pointRun p
t = pointTime p
logDIO INFO $
"t = " ++ show (pointTime p) ++
": expecting the computation result: expectEvent"
let loop =
do
x <- invokeEvent p m
case x of
Just a -> invokeEvent p $ cont a
Nothing -> next loop0
loop0 =
do
x <- invokeEvent p m
case x of
Just a -> invokeEvent p $ cont a
Nothing -> next $ error "Detected a deadlock: expectEvent"
next loop' =
do pq <- invokeEvent p $ R.readRef $ queuePQ q
let f = PQ.queueNull pq
if f
then await loop'
else do let (t2, _) = PQ.queueFront pq
if t < t2
then await loop'
else invokeEvent p $
enqueueEvent t $
Event $ \p -> loop
await loop' =
do
ch <- messageChannel
dt <- fmap dioSyncTimeout dioParams
f <- liftIOUnsafe $
timeout dt $ awaitChannel ch
ok <- invokeEvent p $ runTimeWarp processChannelMessages
when ok $
case f of
Just _ -> loop
Nothing -> loop'
loop