module Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
(TimeServerParams(..),
defaultTimeServerParams,
timeServer,
curryTimeServer) where
import qualified Data.Map as M
import qualified Data.Set as S
import Data.Maybe
import Data.IORef
import Data.Typeable
import Data.Binary
import Data.Time.Clock
import GHC.Generics
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import qualified Control.Monad.Catch as C
import Control.Concurrent
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
data TimeServerParams =
TimeServerParams { tsLoggingPriority :: Priority,
tsReceiveTimeout :: Int,
tsTimeSyncTimeout :: Int,
tsTimeSyncDelay :: Int,
tsProcessMonitoringEnabled :: Bool,
tsProcessMonitoringDelay :: Int,
tsProcessReconnectingEnabled :: Bool,
tsProcessReconnectingDelay :: Int
} deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary TimeServerParams
data TimeServer =
TimeServer { tsParams :: TimeServerParams,
tsInitQuorum :: Int,
tsInInit :: IORef Bool,
tsTerminating :: IORef Bool,
tsProcesses :: IORef (M.Map DP.ProcessId LocalProcessInfo),
tsProcessesInFind :: IORef (S.Set DP.ProcessId),
tsGlobalTime :: IORef (Maybe Double),
tsGlobalTimeTimestamp :: IORef (Maybe UTCTime)
}
data LocalProcessInfo =
LocalProcessInfo { lpLocalTime :: IORef (Maybe Double),
lpMonitorRef :: Maybe DP.MonitorRef
}
defaultTimeServerParams :: TimeServerParams
defaultTimeServerParams =
TimeServerParams { tsLoggingPriority = WARNING,
tsReceiveTimeout = 100000,
tsTimeSyncTimeout = 60000000,
tsTimeSyncDelay = 1000000,
tsProcessMonitoringEnabled = False,
tsProcessMonitoringDelay = 3000000,
tsProcessReconnectingEnabled = False,
tsProcessReconnectingDelay = 5000000
}
newTimeServer :: Int -> TimeServerParams -> IO TimeServer
newTimeServer n ps =
do f <- newIORef True
ft <- newIORef False
m <- newIORef M.empty
s <- newIORef S.empty
t0 <- newIORef Nothing
t' <- newIORef Nothing
return TimeServer { tsParams = ps,
tsInitQuorum = n,
tsInInit = f,
tsTerminating = ft,
tsProcesses = m,
tsProcessesInFind = s,
tsGlobalTime = t0,
tsGlobalTimeTimestamp = t'
}
processTimeServerMessage :: TimeServer -> TimeServerMessage -> DP.Process ()
processTimeServerMessage server (RegisterLocalProcessMessage pid) =
join $ liftIO $
do m <- readIORef (tsProcesses server)
case M.lookup pid m of
Just x ->
return $
logTimeServer server WARNING $
"Time Server: already registered process identifier " ++ show pid
Nothing ->
do t <- newIORef Nothing
modifyIORef (tsProcesses server) $
M.insert pid LocalProcessInfo { lpLocalTime = t, lpMonitorRef = Nothing }
return $
do when (tsProcessMonitoringEnabled $ tsParams server) $
do logTimeServer server INFO $
"Time Server: monitoring the process by identifier " ++ show pid
r <- DP.monitor pid
liftIO $
modifyIORef (tsProcesses server) $
M.update (\x -> Just x { lpMonitorRef = Just r }) pid
serverId <- DP.getSelfPid
DP.send pid (RegisterLocalProcessAcknowledgmentMessage serverId)
tryStartTimeServer server
processTimeServerMessage server (UnregisterLocalProcessMessage pid) =
join $ liftIO $
do m <- readIORef (tsProcesses server)
case M.lookup pid m of
Nothing ->
return $
logTimeServer server WARNING $
"Time Server: unknown process identifier " ++ show pid
Just x ->
do modifyIORef (tsProcesses server) $
M.delete pid
modifyIORef (tsProcessesInFind server) $
S.delete pid
return $
do when (tsProcessMonitoringEnabled $ tsParams server) $
case lpMonitorRef x of
Nothing -> return ()
Just r ->
do logTimeServer server INFO $
"Time Server: unmonitoring the process by identifier " ++ show pid
DP.unmonitor r
serverId <- DP.getSelfPid
DP.send pid (UnregisterLocalProcessAcknowledgmentMessage serverId)
tryProvideTimeServerGlobalTime server
tryTerminateTimeServer server
processTimeServerMessage server (TerminateTimeServerMessage pid) =
join $ liftIO $
do m <- readIORef (tsProcesses server)
case M.lookup pid m of
Nothing ->
return $
logTimeServer server WARNING $
"Time Server: unknown process identifier " ++ show pid
Just x ->
do modifyIORef (tsProcesses server) $
M.delete pid
modifyIORef (tsProcessesInFind server) $
S.delete pid
return $
do when (tsProcessMonitoringEnabled $ tsParams server) $
case lpMonitorRef x of
Nothing -> return ()
Just r ->
do logTimeServer server INFO $
"Time Server: unmonitoring the process by identifier " ++ show pid
DP.unmonitor r
serverId <- DP.getSelfPid
DP.send pid (TerminateTimeServerAcknowledgmentMessage serverId)
startTerminatingTimeServer server
processTimeServerMessage server (RequestGlobalTimeMessage pid) =
tryComputeTimeServerGlobalTime server
processTimeServerMessage server (LocalTimeMessage pid t') =
join $ liftIO $
do m <- readIORef (tsProcesses server)
case M.lookup pid m of
Nothing ->
return $
do logTimeServer server WARNING $
"Time Server: unknown process identifier " ++ show pid
processTimeServerMessage server (RegisterLocalProcessMessage pid)
processTimeServerMessage server (LocalTimeMessage pid t')
Just x ->
do writeIORef (lpLocalTime x) (Just t')
modifyIORef (tsProcessesInFind server) $
S.delete pid
return $
tryProvideTimeServerGlobalTime server
processTimeServerMessage server (ReMonitorTimeServerMessage pids) =
do forM_ pids $ \pid ->
do
logTimeServer server NOTICE $ "Time Server: re-monitoring " ++ show pid
DP.monitor pid
logTimeServer server NOTICE $ "Time Server: started re-monitoring " ++ show pid
resetComputingTimeServerGlobalTime server
(.>=.) :: Maybe Double -> Maybe Double -> Bool
(.>=.) (Just x) (Just y) = x >= y
(.>=.) _ _ = False
(.>.) :: Maybe Double -> Maybe Double -> Bool
(.>.) (Just x) (Just y) = x > y
(.>.) _ _ = False
tryStartTimeServer :: TimeServer -> DP.Process ()
tryStartTimeServer server =
join $ liftIO $
do f <- readIORef (tsInInit server)
if not f
then return $
return ()
else do m <- readIORef (tsProcesses server)
if M.size m < tsInitQuorum server
then return $
return ()
else do writeIORef (tsInInit server) False
return $
do logTimeServer server INFO $
"Time Server: starting"
tryComputeTimeServerGlobalTime server
tryComputeTimeServerGlobalTime :: TimeServer -> DP.Process ()
tryComputeTimeServerGlobalTime server =
join $ liftIO $
do f <- readIORef (tsInInit server)
if f
then return $
return ()
else do s <- readIORef (tsProcessesInFind server)
if S.size s > 0
then return $
return ()
else return $
computeTimeServerGlobalTime server
resetComputingTimeServerGlobalTime :: TimeServer -> DP.Process ()
resetComputingTimeServerGlobalTime server =
do logTimeServer server NOTICE $
"Time Server: reset computing the global time"
liftIO $
do utc <- getCurrentTime
writeIORef (tsProcessesInFind server) S.empty
writeIORef (tsGlobalTimeTimestamp server) (Just utc)
tryProvideTimeServerGlobalTime :: TimeServer -> DP.Process ()
tryProvideTimeServerGlobalTime server =
join $ liftIO $
do f <- readIORef (tsInInit server)
if f
then return $
return ()
else do s <- readIORef (tsProcessesInFind server)
if S.size s > 0
then return $
return ()
else return $
provideTimeServerGlobalTime server
computeTimeServerGlobalTime :: TimeServer -> DP.Process ()
computeTimeServerGlobalTime server =
do logTimeServer server DEBUG $
"Time Server: computing the global time..."
zs <- liftIO $ fmap M.assocs $ readIORef (tsProcesses server)
forM_ zs $ \(pid, x) ->
liftIO $
modifyIORef (tsProcessesInFind server) $
S.insert pid
forM_ zs $ \(pid, x) ->
DP.send pid ComputeLocalTimeMessage
provideTimeServerGlobalTime :: TimeServer -> DP.Process ()
provideTimeServerGlobalTime server =
do t0 <- liftIO $ timeServerGlobalTime server
logTimeServer server INFO $
"Time Server: providing the global time = " ++ show t0
case t0 of
Nothing -> return ()
Just t0 ->
do t' <- liftIO $ readIORef (tsGlobalTime server)
when (t' .>. Just t0) $
logTimeServer server NOTICE
"Time Server: the global time has decreased"
timestamp <- liftIO getCurrentTime
liftIO $ writeIORef (tsGlobalTime server) (Just t0)
liftIO $ writeIORef (tsGlobalTimeTimestamp server) (Just timestamp)
zs <- liftIO $ fmap M.assocs $ readIORef (tsProcesses server)
forM_ zs $ \(pid, x) ->
DP.send pid (GlobalTimeMessage t0)
timeServerGlobalTime :: TimeServer -> IO (Maybe Double)
timeServerGlobalTime server =
do zs <- fmap M.assocs $ readIORef (tsProcesses server)
case zs of
[] -> return Nothing
((pid, x) : zs') ->
do t <- readIORef (lpLocalTime x)
loop zs t
where loop [] acc = return acc
loop ((pid, x) : zs') acc =
do t <- readIORef (lpLocalTime x)
case t of
Nothing ->
loop zs' Nothing
Just _ ->
loop zs' (liftM2 min t acc)
startTerminatingTimeServer :: TimeServer -> DP.Process ()
startTerminatingTimeServer server =
do logTimeServer server INFO "Time Server: start terminating..."
liftIO $
writeIORef (tsTerminating server) True
tryTerminateTimeServer server
tryTerminateTimeServer :: TimeServer -> DP.Process ()
tryTerminateTimeServer server =
do f <- liftIO $ readIORef (tsTerminating server)
when f $
do m <- liftIO $ readIORef (tsProcesses server)
when (M.null m) $
do logTimeServer server INFO "Time Server: terminate"
DP.terminate
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x)
data InternalTimeServerMessage = InternalTimeServerMessage TimeServerMessage
| InternalProcessMonitorNotification DP.ProcessMonitorNotification
| InternalKeepAliveMessage KeepAliveMessage
handleTimeServerException :: TimeServer -> SomeException -> DP.Process ()
handleTimeServerException server e =
do
logTimeServer server ERROR $ "Exception occured: " ++ show e
C.throwM e
timeServer :: Int -> TimeServerParams -> DP.Process ()
timeServer n ps =
do server <- liftIO $ newTimeServer n ps
logTimeServer server INFO "Time Server: starting..."
let loop utc0 =
do let f1 :: TimeServerMessage -> DP.Process InternalTimeServerMessage
f1 x = return (InternalTimeServerMessage x)
f2 :: DP.ProcessMonitorNotification -> DP.Process InternalTimeServerMessage
f2 x = return (InternalProcessMonitorNotification x)
f3 :: KeepAliveMessage -> DP.Process InternalTimeServerMessage
f3 x = return (InternalKeepAliveMessage x)
a <- DP.receiveTimeout (tsReceiveTimeout ps) [DP.match f1, DP.match f2, DP.match f3]
case a of
Nothing -> return ()
Just (InternalTimeServerMessage m) ->
do
logTimeServer server DEBUG $
"Time Server: " ++ show m
processTimeServerMessage server m
Just (InternalProcessMonitorNotification m) ->
handleProcessMonitorNotification m server
Just (InternalKeepAliveMessage m) ->
do
logTimeServer server DEBUG $
"Time Server: " ++ show m
return ()
utc <- liftIO getCurrentTime
timestamp <- liftIO $ readIORef (tsGlobalTimeTimestamp server)
case timestamp of
Just x | shouldResetComputingTimeServerGlobalTime server x utc ->
resetComputingTimeServerGlobalTime server
_ -> return ()
if shouldComputeTimeServerGlobalTime server utc0 utc
then do tryComputeTimeServerGlobalTime server
loop utc
else loop utc0
C.catch (liftIO getCurrentTime >>= loop) (handleTimeServerException server)
handleProcessMonitorNotification :: DP.ProcessMonitorNotification -> TimeServer -> DP.Process ()
handleProcessMonitorNotification m@(DP.ProcessMonitorNotification _ pid0 reason) server =
do let ps = tsParams server
recv m@(DP.ProcessMonitorNotification _ _ _) =
do
logTimeServer server WARNING $
"Time Server: received a process monitor notification " ++ show m
return m
recv m
when (tsProcessReconnectingEnabled ps && reason == DP.DiedDisconnect) $
do liftIO $
threadDelay (tsProcessReconnectingDelay ps)
let pred m@(DP.ProcessMonitorNotification _ _ reason) = reason == DP.DiedDisconnect
loop :: [DP.ProcessId] -> DP.Process [DP.ProcessId]
loop acc =
do y <- DP.receiveTimeout 0 [DP.matchIf pred recv]
case y of
Nothing -> return $ reverse acc
Just m@(DP.ProcessMonitorNotification _ pid _) -> loop (pid : acc)
pids <- loop [pid0]
logTimeServer server NOTICE "Begin reconnecting..."
forM_ pids $ \pid ->
do
logTimeServer server NOTICE $
"Time Server: reconnecting to " ++ show pid
DP.reconnect pid
serverId <- DP.getSelfPid
DP.spawnLocal $
let action =
do liftIO $
threadDelay (tsProcessMonitoringDelay ps)
logTimeServer server NOTICE $ "Time Server: proceed to the re-monitoring"
DP.send serverId (ReMonitorTimeServerMessage pids)
in C.catch action (handleTimeServerException server)
return ()
shouldComputeTimeServerGlobalTime :: TimeServer -> UTCTime -> UTCTime -> Bool
shouldComputeTimeServerGlobalTime server utc0 utc =
let dt = fromRational $ toRational (diffUTCTime utc utc0)
in secondsToMicroseconds dt > (tsTimeSyncDelay $ tsParams server)
shouldResetComputingTimeServerGlobalTime :: TimeServer -> UTCTime -> UTCTime -> Bool
shouldResetComputingTimeServerGlobalTime server utc0 utc =
let dt = fromRational $ toRational (diffUTCTime utc utc0)
in secondsToMicroseconds dt > (tsTimeSyncTimeout $ tsParams server)
curryTimeServer :: (Int, TimeServerParams) -> DP.Process ()
curryTimeServer (n, ps) = timeServer n ps
logTimeServer :: TimeServer -> Priority -> String -> DP.Process ()
logTimeServer server p message =
when (tsLoggingPriority (tsParams server) <= p) $
DP.say $
embracePriority p ++ " " ++ message