module Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
(TimeServerParams(..),
defaultTimeServerParams,
timeServer,
curryTimeServer) where
import qualified Data.Map as M
import qualified Data.Set as S
import Data.Maybe
import Data.IORef
import Data.Typeable
import Data.Binary
import Data.Time.Clock
import GHC.Generics
import Control.Monad
import Control.Monad.Trans
import Control.Concurrent
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
data TimeServerParams =
TimeServerParams { tsLoggingPriority :: Priority,
tsExpectTimeout :: Int,
tsTimeSyncDelay :: Int
} deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary TimeServerParams
data TimeServer =
TimeServer { tsParams :: TimeServerParams,
tsInitQuorum :: Int,
tsInInit :: IORef Bool,
tsProcesses :: IORef (M.Map DP.ProcessId LocalProcessInfo),
tsProcessesInFind :: IORef (S.Set DP.ProcessId),
tsGlobalTime :: IORef (Maybe Double)
}
data LocalProcessInfo =
LocalProcessInfo { lpLocalTime :: IORef (Maybe Double)
}
defaultTimeServerParams :: TimeServerParams
defaultTimeServerParams =
TimeServerParams { tsLoggingPriority = WARNING,
tsExpectTimeout = 100000,
tsTimeSyncDelay = 1000000
}
newTimeServer :: Int -> TimeServerParams -> IO TimeServer
newTimeServer n ps =
do f <- newIORef True
m <- newIORef M.empty
s <- newIORef S.empty
t0 <- newIORef Nothing
return TimeServer { tsParams = ps,
tsInitQuorum = n,
tsInInit = f,
tsProcesses = m,
tsProcessesInFind = s,
tsGlobalTime = t0
}
processTimeServerMessage :: TimeServer -> TimeServerMessage -> DP.Process ()
processTimeServerMessage server (RegisterLocalProcessMessage pid) =
join $ liftIO $
do m <- readIORef (tsProcesses server)
case M.lookup pid m of
Just x ->
return $
logTimeServer server WARNING $
"Time Server: already registered process identifier " ++ show pid
Nothing ->
do t <- newIORef Nothing
modifyIORef (tsProcesses server) $
M.insert pid LocalProcessInfo { lpLocalTime = t }
return $
tryStartTimeServer server
processTimeServerMessage server (UnregisterLocalProcessMessage pid) =
join $ liftIO $
do m <- readIORef (tsProcesses server)
case M.lookup pid m of
Nothing ->
return $
logTimeServer server WARNING $
"Time Server: unknown process identifier " ++ show pid
Just x ->
do modifyIORef (tsProcesses server) $
M.delete pid
modifyIORef (tsProcessesInFind server) $
S.delete pid
return $
tryProvideTimeServerGlobalTime server
processTimeServerMessage server (TerminateTimeServerMessage pid) =
do pids <-
liftIO $
do m <- readIORef (tsProcesses server)
writeIORef (tsProcesses server) M.empty
writeIORef (tsProcessesInFind server) S.empty
writeIORef (tsGlobalTime server) Nothing
return $ filter (/= pid) (M.keys m)
forM_ pids $ \pid ->
DP.send pid TerminateLocalProcessMessage
logTimeServer server INFO "Time Server: terminating..."
DP.terminate
processTimeServerMessage server (RequestGlobalTimeMessage pid) =
tryComputeTimeServerGlobalTime server
processTimeServerMessage server (LocalTimeMessage pid t') =
join $ liftIO $
do m <- readIORef (tsProcesses server)
case M.lookup pid m of
Nothing ->
return $
do logTimeServer server WARNING $
"Time Server: unknown process identifier " ++ show pid
processTimeServerMessage server (RegisterLocalProcessMessage pid)
processTimeServerMessage server (LocalTimeMessage pid t')
Just x ->
do writeIORef (lpLocalTime x) (Just t')
modifyIORef (tsProcessesInFind server) $
S.delete pid
return $
tryProvideTimeServerGlobalTime server
(.>=.) :: Maybe Double -> Maybe Double -> Bool
(.>=.) (Just x) (Just y) = x >= y
(.>=.) _ _ = False
(.>.) :: Maybe Double -> Maybe Double -> Bool
(.>.) (Just x) (Just y) = x > y
(.>.) _ _ = False
tryStartTimeServer :: TimeServer -> DP.Process ()
tryStartTimeServer server =
join $ liftIO $
do f <- readIORef (tsInInit server)
if not f
then return $
return ()
else do m <- readIORef (tsProcesses server)
if M.size m < tsInitQuorum server
then return $
return ()
else do writeIORef (tsInInit server) False
return $
do logTimeServer server INFO $
"Time Server: starting"
tryComputeTimeServerGlobalTime server
tryComputeTimeServerGlobalTime :: TimeServer -> DP.Process ()
tryComputeTimeServerGlobalTime server =
join $ liftIO $
do f <- readIORef (tsInInit server)
if f
then return $
return ()
else do s <- readIORef (tsProcessesInFind server)
if S.size s > 0
then return $
return ()
else return $
computeTimeServerGlobalTime server
tryProvideTimeServerGlobalTime :: TimeServer -> DP.Process ()
tryProvideTimeServerGlobalTime server =
join $ liftIO $
do f <- readIORef (tsInInit server)
if f
then return $
return ()
else do s <- readIORef (tsProcessesInFind server)
if S.size s > 0
then return $
return ()
else return $
provideTimeServerGlobalTime server
computeTimeServerGlobalTime :: TimeServer -> DP.Process ()
computeTimeServerGlobalTime server =
do logTimeServer server DEBUG $
"Time Server: computing the global time..."
zs <- liftIO $ fmap M.assocs $ readIORef (tsProcesses server)
forM_ zs $ \(pid, x) ->
liftIO $
modifyIORef (tsProcessesInFind server) $
S.insert pid
forM_ zs $ \(pid, x) ->
DP.send pid ComputeLocalTimeMessage
provideTimeServerGlobalTime :: TimeServer -> DP.Process ()
provideTimeServerGlobalTime server =
do t0 <- liftIO $ timeServerGlobalTime server
logTimeServer server DEBUG $
"Time Server: providing the global time = " ++ show t0
case t0 of
Nothing -> return ()
Just t0 ->
do t' <- liftIO $ readIORef (tsGlobalTime server)
when (t' .>. Just t0) $
logTimeServer server NOTICE
"Time Server: the global time has decreased"
liftIO $ writeIORef (tsGlobalTime server) (Just t0)
zs <- liftIO $ fmap M.assocs $ readIORef (tsProcesses server)
forM_ zs $ \(pid, x) ->
DP.send pid (GlobalTimeMessage t0)
timeServerGlobalTime :: TimeServer -> IO (Maybe Double)
timeServerGlobalTime server =
do zs <- fmap M.assocs $ readIORef (tsProcesses server)
case zs of
[] -> return Nothing
((pid, x) : zs') ->
do t <- readIORef (lpLocalTime x)
loop zs t
where loop [] acc = return acc
loop ((pid, x) : zs') acc =
do t <- readIORef (lpLocalTime x)
case t of
Nothing ->
loop zs' Nothing
Just _ ->
loop zs' (liftM2 min t acc)
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x)
timeServer :: Int -> TimeServerParams -> DP.Process ()
timeServer n ps =
do server <- liftIO $ newTimeServer n ps
logTimeServer server INFO "Time Server: starting..."
let loop utc0 =
do m <- DP.expectTimeout (tsExpectTimeout ps) :: DP.Process (Maybe TimeServerMessage)
case m of
Nothing -> return ()
Just m ->
do
logTimeServer server DEBUG $
"Time Server: " ++ show m
processTimeServerMessage server m
utc <- liftIO getCurrentTime
let dt = fromRational $ toRational (diffUTCTime utc utc0)
if secondsToMicroseconds dt > tsTimeSyncDelay ps
then do tryComputeTimeServerGlobalTime server
loop utc
else loop utc0
liftIO getCurrentTime >>= loop
curryTimeServer :: (Int, TimeServerParams) -> DP.Process ()
curryTimeServer (n, ps) = timeServer n ps
logTimeServer :: TimeServer -> Priority -> String -> DP.Process ()
logTimeServer server p message =
when (tsLoggingPriority (tsParams server) <= p) $
DP.say $
embracePriority p ++ " " ++ message