{-# LANGUAGE DeriveGeneric, DeriveDataTypeable #-}

-- |
-- Module     : Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
-- Copyright  : Copyright (c) 2015-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 7.10.3
--
-- This module allows running the time server that coordinates the global simulation time.
--
module Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
       (TimeServerParams(..),
        TimeServerEnv(..),
        TimeServerStrategy(..),
        defaultTimeServerParams,
        defaultTimeServerEnv,
        timeServer,
        timeServerWithEnv,
        curryTimeServer) where

import qualified Data.Map as M
import qualified Data.Set as S
import Data.Maybe
import Data.IORef
import Data.Typeable
import Data.Binary
import Data.Time.Clock

import GHC.Generics

import Control.Monad
import Control.Monad.Trans
import Control.Exception
import qualified Control.Monad.Catch as C
import Control.Concurrent
import qualified Control.Distributed.Process as DP

import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
import Simulation.Aivika.Distributed.Optimistic.State

-- | The time server parameters.
data TimeServerParams =
  TimeServerParams { TimeServerParams -> Priority
tsLoggingPriority :: Priority,
                     -- ^ the logging priority
                     TimeServerParams -> String
tsName :: String,
                     -- ^ the monitoring name of the time server
                     TimeServerParams -> Int
tsReceiveTimeout :: Int,
                     -- ^ the timeout in microseconds used when receiving messages
                     TimeServerParams -> Int
tsTimeSyncTimeout :: Int,
                     -- ^ the timeout in microseconds used for the time synchronization sessions
                     TimeServerParams -> Int
tsTimeSyncDelay :: Int,
                     -- ^ the delay in microseconds between the time synchronization sessions
                     TimeServerParams -> Bool
tsProcessMonitoringEnabled :: Bool,
                     -- ^ whether the process monitoring is enabled
                     TimeServerParams -> Int
tsProcessMonitoringDelay :: Int,
                     -- ^ The delay in microseconds which must be applied for monitoring every remote process
                     TimeServerParams -> Bool
tsProcessReconnectingEnabled :: Bool,
                     -- ^ whether the automatic reconnecting to processes is enabled when enabled monitoring
                     TimeServerParams -> Int
tsProcessReconnectingDelay :: Int,
                     -- ^ the delay in microseconds before reconnecting
                     TimeServerParams -> Int
tsSimulationMonitoringInterval :: Int,
                     -- ^ the interval in microseconds between sending the simulation monitoring messages
                     TimeServerParams -> Int
tsSimulationMonitoringTimeout :: Int,
                     -- ^ the timeout in microseconds when processing the simulation monitoring messages
                     TimeServerParams -> TimeServerStrategy
tsStrategy :: TimeServerStrategy
                     -- ^ the time server strategy
                   } deriving (TimeServerParams -> TimeServerParams -> Bool
(TimeServerParams -> TimeServerParams -> Bool)
-> (TimeServerParams -> TimeServerParams -> Bool)
-> Eq TimeServerParams
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TimeServerParams -> TimeServerParams -> Bool
== :: TimeServerParams -> TimeServerParams -> Bool
$c/= :: TimeServerParams -> TimeServerParams -> Bool
/= :: TimeServerParams -> TimeServerParams -> Bool
Eq, Eq TimeServerParams
Eq TimeServerParams =>
(TimeServerParams -> TimeServerParams -> Ordering)
-> (TimeServerParams -> TimeServerParams -> Bool)
-> (TimeServerParams -> TimeServerParams -> Bool)
-> (TimeServerParams -> TimeServerParams -> Bool)
-> (TimeServerParams -> TimeServerParams -> Bool)
-> (TimeServerParams -> TimeServerParams -> TimeServerParams)
-> (TimeServerParams -> TimeServerParams -> TimeServerParams)
-> Ord TimeServerParams
TimeServerParams -> TimeServerParams -> Bool
TimeServerParams -> TimeServerParams -> Ordering
TimeServerParams -> TimeServerParams -> TimeServerParams
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: TimeServerParams -> TimeServerParams -> Ordering
compare :: TimeServerParams -> TimeServerParams -> Ordering
$c< :: TimeServerParams -> TimeServerParams -> Bool
< :: TimeServerParams -> TimeServerParams -> Bool
$c<= :: TimeServerParams -> TimeServerParams -> Bool
<= :: TimeServerParams -> TimeServerParams -> Bool
$c> :: TimeServerParams -> TimeServerParams -> Bool
> :: TimeServerParams -> TimeServerParams -> Bool
$c>= :: TimeServerParams -> TimeServerParams -> Bool
>= :: TimeServerParams -> TimeServerParams -> Bool
$cmax :: TimeServerParams -> TimeServerParams -> TimeServerParams
max :: TimeServerParams -> TimeServerParams -> TimeServerParams
$cmin :: TimeServerParams -> TimeServerParams -> TimeServerParams
min :: TimeServerParams -> TimeServerParams -> TimeServerParams
Ord, Int -> TimeServerParams -> ShowS
[TimeServerParams] -> ShowS
TimeServerParams -> String
(Int -> TimeServerParams -> ShowS)
-> (TimeServerParams -> String)
-> ([TimeServerParams] -> ShowS)
-> Show TimeServerParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TimeServerParams -> ShowS
showsPrec :: Int -> TimeServerParams -> ShowS
$cshow :: TimeServerParams -> String
show :: TimeServerParams -> String
$cshowList :: [TimeServerParams] -> ShowS
showList :: [TimeServerParams] -> ShowS
Show, Typeable, (forall x. TimeServerParams -> Rep TimeServerParams x)
-> (forall x. Rep TimeServerParams x -> TimeServerParams)
-> Generic TimeServerParams
forall x. Rep TimeServerParams x -> TimeServerParams
forall x. TimeServerParams -> Rep TimeServerParams x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. TimeServerParams -> Rep TimeServerParams x
from :: forall x. TimeServerParams -> Rep TimeServerParams x
$cto :: forall x. Rep TimeServerParams x -> TimeServerParams
to :: forall x. Rep TimeServerParams x -> TimeServerParams
Generic)

instance Binary TimeServerParams

-- | Those time server environment parameters that cannot be serialized and passed to another process via the net.
data TimeServerEnv =
  TimeServerEnv { TimeServerEnv -> Maybe (TimeServerState -> Process ())
tsSimulationMonitoringAction :: Maybe (TimeServerState -> DP.Process ())
                  -- ^ the simulation monitoring action
                }

-- | The time server strategy.
data TimeServerStrategy = WaitIndefinitelyForLogicalProcess
                          -- ^ wait for the logical processes forever
                        | TerminateDueToLogicalProcessTimeout Int
                          -- ^ terminate the server due to the exceeded logical process timeout in microseconds,
                          -- but not less than 'tsTimeSyncTimeout', which should be applied if
                          -- the process reconnecting is enabled
                        | UnregisterLogicalProcessDueToTimeout Int
                          -- ^ unregister the logical process due to the exceeded timeout in microseconds,
                          -- but not less than 'tsTimeSyncTimeout', which can be applied only if
                          -- the process disconnecting is enabled
                        deriving (TimeServerStrategy -> TimeServerStrategy -> Bool
(TimeServerStrategy -> TimeServerStrategy -> Bool)
-> (TimeServerStrategy -> TimeServerStrategy -> Bool)
-> Eq TimeServerStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TimeServerStrategy -> TimeServerStrategy -> Bool
== :: TimeServerStrategy -> TimeServerStrategy -> Bool
$c/= :: TimeServerStrategy -> TimeServerStrategy -> Bool
/= :: TimeServerStrategy -> TimeServerStrategy -> Bool
Eq, Eq TimeServerStrategy
Eq TimeServerStrategy =>
(TimeServerStrategy -> TimeServerStrategy -> Ordering)
-> (TimeServerStrategy -> TimeServerStrategy -> Bool)
-> (TimeServerStrategy -> TimeServerStrategy -> Bool)
-> (TimeServerStrategy -> TimeServerStrategy -> Bool)
-> (TimeServerStrategy -> TimeServerStrategy -> Bool)
-> (TimeServerStrategy -> TimeServerStrategy -> TimeServerStrategy)
-> (TimeServerStrategy -> TimeServerStrategy -> TimeServerStrategy)
-> Ord TimeServerStrategy
TimeServerStrategy -> TimeServerStrategy -> Bool
TimeServerStrategy -> TimeServerStrategy -> Ordering
TimeServerStrategy -> TimeServerStrategy -> TimeServerStrategy
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: TimeServerStrategy -> TimeServerStrategy -> Ordering
compare :: TimeServerStrategy -> TimeServerStrategy -> Ordering
$c< :: TimeServerStrategy -> TimeServerStrategy -> Bool
< :: TimeServerStrategy -> TimeServerStrategy -> Bool
$c<= :: TimeServerStrategy -> TimeServerStrategy -> Bool
<= :: TimeServerStrategy -> TimeServerStrategy -> Bool
$c> :: TimeServerStrategy -> TimeServerStrategy -> Bool
> :: TimeServerStrategy -> TimeServerStrategy -> Bool
$c>= :: TimeServerStrategy -> TimeServerStrategy -> Bool
>= :: TimeServerStrategy -> TimeServerStrategy -> Bool
$cmax :: TimeServerStrategy -> TimeServerStrategy -> TimeServerStrategy
max :: TimeServerStrategy -> TimeServerStrategy -> TimeServerStrategy
$cmin :: TimeServerStrategy -> TimeServerStrategy -> TimeServerStrategy
min :: TimeServerStrategy -> TimeServerStrategy -> TimeServerStrategy
Ord, Int -> TimeServerStrategy -> ShowS
[TimeServerStrategy] -> ShowS
TimeServerStrategy -> String
(Int -> TimeServerStrategy -> ShowS)
-> (TimeServerStrategy -> String)
-> ([TimeServerStrategy] -> ShowS)
-> Show TimeServerStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TimeServerStrategy -> ShowS
showsPrec :: Int -> TimeServerStrategy -> ShowS
$cshow :: TimeServerStrategy -> String
show :: TimeServerStrategy -> String
$cshowList :: [TimeServerStrategy] -> ShowS
showList :: [TimeServerStrategy] -> ShowS
Show, Typeable, (forall x. TimeServerStrategy -> Rep TimeServerStrategy x)
-> (forall x. Rep TimeServerStrategy x -> TimeServerStrategy)
-> Generic TimeServerStrategy
forall x. Rep TimeServerStrategy x -> TimeServerStrategy
forall x. TimeServerStrategy -> Rep TimeServerStrategy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. TimeServerStrategy -> Rep TimeServerStrategy x
from :: forall x. TimeServerStrategy -> Rep TimeServerStrategy x
$cto :: forall x. Rep TimeServerStrategy x -> TimeServerStrategy
to :: forall x. Rep TimeServerStrategy x -> TimeServerStrategy
Generic)

instance Binary TimeServerStrategy

-- | The time server.
data TimeServer =
  TimeServer { TimeServer -> TimeServerParams
tsParams :: TimeServerParams,
               -- ^ the time server parameters
               TimeServer -> Int
tsInitQuorum :: Int,
               -- ^ the initial quorum of registered logical processes to start the simulation
               TimeServer -> IORef Bool
tsInInit :: IORef Bool,
               -- ^ whether the time server is in the initial mode
               TimeServer -> IORef Bool
tsTerminating :: IORef Bool,
               -- ^ whether the time server is in the terminating mode
               TimeServer -> IORef Bool
tsTerminated :: IORef Bool,
               -- ^ whether the server is terminated
               TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses :: IORef (M.Map DP.ProcessId LogicalProcessInfo),
               -- ^ the information about logical processes
               TimeServer -> IORef (Set ProcessId)
tsProcessesInFind :: IORef (S.Set DP.ProcessId),
               -- ^ the processed used in the current finding of the global time
               TimeServer -> IORef (Maybe Double)
tsGlobalTime :: IORef (Maybe Double),
               -- ^ the global time of the model
               TimeServer -> IORef (Maybe UTCTime)
tsGlobalTimeTimestamp :: IORef (Maybe UTCTime),
               -- ^ the global time timestamp
               TimeServer -> IORef UTCTime
tsLogicalProcessValidationTimestamp :: IORef UTCTime,
               -- ^ the logical process validation timestamp
               TimeServer -> ConnectionManager
tsConnectionManager :: ConnectionManager
               -- ^ the connection manager
             }

-- | The information about the logical process.
data LogicalProcessInfo =
  LogicalProcessInfo { LogicalProcessInfo -> ProcessId
lpId :: DP.ProcessId,
                       -- ^ the logical process identifier
                       LogicalProcessInfo -> IORef (Maybe Double)
lpLocalTime :: IORef (Maybe Double),
                       -- ^ the local time of the process
                       LogicalProcessInfo -> IORef UTCTime
lpTimestamp :: IORef UTCTime
                       -- ^ the logical process timestamp
                     }

-- | The default time server parameters.
defaultTimeServerParams :: TimeServerParams
defaultTimeServerParams :: TimeServerParams
defaultTimeServerParams =
  TimeServerParams { tsLoggingPriority :: Priority
tsLoggingPriority = Priority
WARNING,
                     tsName :: String
tsName = String
"Time Server",
                     tsReceiveTimeout :: Int
tsReceiveTimeout = Int
100000,
                     tsTimeSyncTimeout :: Int
tsTimeSyncTimeout = Int
60000000,
                     tsTimeSyncDelay :: Int
tsTimeSyncDelay = Int
100000,
                     tsProcessMonitoringEnabled :: Bool
tsProcessMonitoringEnabled = Bool
False,
                     tsProcessMonitoringDelay :: Int
tsProcessMonitoringDelay = Int
3000000,
                     tsProcessReconnectingEnabled :: Bool
tsProcessReconnectingEnabled = Bool
False,
                     tsProcessReconnectingDelay :: Int
tsProcessReconnectingDelay = Int
5000000,
                     tsSimulationMonitoringInterval :: Int
tsSimulationMonitoringInterval = Int
30000000,
                     tsSimulationMonitoringTimeout :: Int
tsSimulationMonitoringTimeout = Int
100000,
                     tsStrategy :: TimeServerStrategy
tsStrategy = Int -> TimeServerStrategy
TerminateDueToLogicalProcessTimeout Int
300000000
                   }

-- | The default time server environment parameters.
defaultTimeServerEnv :: TimeServerEnv
defaultTimeServerEnv :: TimeServerEnv
defaultTimeServerEnv =
  TimeServerEnv { tsSimulationMonitoringAction :: Maybe (TimeServerState -> Process ())
tsSimulationMonitoringAction = Maybe (TimeServerState -> Process ())
forall a. Maybe a
Nothing }

-- | Create a new time server by the specified initial quorum and parameters.
newTimeServer :: Int -> TimeServerParams -> IO TimeServer
newTimeServer :: Int -> TimeServerParams -> IO TimeServer
newTimeServer Int
n TimeServerParams
ps =
  do IORef Bool
f  <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
     IORef Bool
ft <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
     IORef Bool
fe <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
     IORef (Map ProcessId LogicalProcessInfo)
m  <- Map ProcessId LogicalProcessInfo
-> IO (IORef (Map ProcessId LogicalProcessInfo))
forall a. a -> IO (IORef a)
newIORef Map ProcessId LogicalProcessInfo
forall k a. Map k a
M.empty
     IORef (Set ProcessId)
s  <- Set ProcessId -> IO (IORef (Set ProcessId))
forall a. a -> IO (IORef a)
newIORef Set ProcessId
forall a. Set a
S.empty
     IORef (Maybe Double)
t0 <- Maybe Double -> IO (IORef (Maybe Double))
forall a. a -> IO (IORef a)
newIORef Maybe Double
forall a. Maybe a
Nothing
     IORef (Maybe UTCTime)
t' <- Maybe UTCTime -> IO (IORef (Maybe UTCTime))
forall a. a -> IO (IORef a)
newIORef Maybe UTCTime
forall a. Maybe a
Nothing
     IORef UTCTime
t2 <- IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO (IORef UTCTime)) -> IO (IORef UTCTime)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UTCTime -> IO (IORef UTCTime)
forall a. a -> IO (IORef a)
newIORef
     ConnectionManager
connManager <- ConnectionParams -> IO ConnectionManager
newConnectionManager (ConnectionParams -> IO ConnectionManager)
-> ConnectionParams -> IO ConnectionManager
forall a b. (a -> b) -> a -> b
$
                    ConnectionParams { connLoggingPriority :: Priority
connLoggingPriority = TimeServerParams -> Priority
tsLoggingPriority TimeServerParams
ps,
                                       connKeepAliveInterval :: Int
connKeepAliveInterval = Int
0, -- not used
                                       connReconnectingDelay :: Int
connReconnectingDelay = TimeServerParams -> Int
tsProcessReconnectingDelay TimeServerParams
ps,
                                       connMonitoringDelay :: Int
connMonitoringDelay = TimeServerParams -> Int
tsProcessMonitoringDelay TimeServerParams
ps }
     TimeServer -> IO TimeServer
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return TimeServer { tsParams :: TimeServerParams
tsParams = TimeServerParams
ps,
                         tsInitQuorum :: Int
tsInitQuorum = Int
n,
                         tsInInit :: IORef Bool
tsInInit = IORef Bool
f,
                         tsTerminating :: IORef Bool
tsTerminating = IORef Bool
ft,
                         tsTerminated :: IORef Bool
tsTerminated = IORef Bool
fe,
                         tsProcesses :: IORef (Map ProcessId LogicalProcessInfo)
tsProcesses = IORef (Map ProcessId LogicalProcessInfo)
m,
                         tsProcessesInFind :: IORef (Set ProcessId)
tsProcessesInFind = IORef (Set ProcessId)
s,
                         tsGlobalTime :: IORef (Maybe Double)
tsGlobalTime = IORef (Maybe Double)
t0,
                         tsGlobalTimeTimestamp :: IORef (Maybe UTCTime)
tsGlobalTimeTimestamp = IORef (Maybe UTCTime)
t',
                         tsLogicalProcessValidationTimestamp :: IORef UTCTime
tsLogicalProcessValidationTimestamp = IORef UTCTime
t2,
                         tsConnectionManager :: ConnectionManager
tsConnectionManager = ConnectionManager
connManager
                       }

-- | Process the time server message.
processTimeServerMessage :: TimeServer -> TimeServerMessage -> DP.Process ()
processTimeServerMessage :: TimeServer -> TimeServerMessage -> Process ()
processTimeServerMessage TimeServer
server (RegisterLogicalProcessMessage ProcessId
pid) =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Map ProcessId LogicalProcessInfo
m <- IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     case ProcessId
-> Map ProcessId LogicalProcessInfo -> Maybe LogicalProcessInfo
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId LogicalProcessInfo
m of
       Just LogicalProcessInfo
x ->
         Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
         TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
         String
"Time Server: already registered process identifier " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
       Maybe LogicalProcessInfo
Nothing  ->
         do IORef (Maybe Double)
t <- Maybe Double -> IO (IORef (Maybe Double))
forall a. a -> IO (IORef a)
newIORef Maybe Double
forall a. Maybe a
Nothing
            IORef UTCTime
utc <- IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO (IORef UTCTime)) -> IO (IORef UTCTime)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UTCTime -> IO (IORef UTCTime)
forall a. a -> IO (IORef a)
newIORef
            IORef (Map ProcessId LogicalProcessInfo)
-> (Map ProcessId LogicalProcessInfo
    -> Map ProcessId LogicalProcessInfo)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server) ((Map ProcessId LogicalProcessInfo
  -> Map ProcessId LogicalProcessInfo)
 -> IO ())
-> (Map ProcessId LogicalProcessInfo
    -> Map ProcessId LogicalProcessInfo)
-> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId
-> LogicalProcessInfo
-> Map ProcessId LogicalProcessInfo
-> Map ProcessId LogicalProcessInfo
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ProcessId
pid LogicalProcessInfo { lpId :: ProcessId
lpId = ProcessId
pid, lpLocalTime :: IORef (Maybe Double)
lpLocalTime = IORef (Maybe Double)
t, lpTimestamp :: IORef UTCTime
lpTimestamp = IORef UTCTime
utc }
            Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
              do Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServerParams -> Bool) -> TimeServerParams -> Bool
forall a b. (a -> b) -> a -> b
$ TimeServer -> TimeServerParams
tsParams TimeServer
server) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                   do ConnectionManager -> ProcessId -> Process Bool
tryAddMessageReceiver (TimeServer -> ConnectionManager
tsConnectionManager TimeServer
server) ProcessId
pid
                      () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                 ProcessId
serverId <- Process ProcessId
DP.getSelfPid
                 if TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServer -> TimeServerParams
tsParams TimeServer
server)
                   then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (ProcessId -> InboxProcessMessage
RegisterLogicalProcessAcknowledgementMessage ProcessId
serverId)
                   else ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (ProcessId -> InboxProcessMessage
RegisterLogicalProcessAcknowledgementMessage ProcessId
serverId)
                 TimeServer -> Process ()
tryStartTimeServer TimeServer
server
processTimeServerMessage TimeServer
server (UnregisterLogicalProcessMessage ProcessId
pid) =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Map ProcessId LogicalProcessInfo
m <- IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     case ProcessId
-> Map ProcessId LogicalProcessInfo -> Maybe LogicalProcessInfo
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId LogicalProcessInfo
m of
       Maybe LogicalProcessInfo
Nothing ->
         Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
         TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
         String
"Time Server: unknown process identifier " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
       Just LogicalProcessInfo
x  ->
         do IORef (Map ProcessId LogicalProcessInfo)
-> (Map ProcessId LogicalProcessInfo
    -> Map ProcessId LogicalProcessInfo)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server) ((Map ProcessId LogicalProcessInfo
  -> Map ProcessId LogicalProcessInfo)
 -> IO ())
-> (Map ProcessId LogicalProcessInfo
    -> Map ProcessId LogicalProcessInfo)
-> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId
-> Map ProcessId LogicalProcessInfo
-> Map ProcessId LogicalProcessInfo
forall k a. Ord k => k -> Map k a -> Map k a
M.delete ProcessId
pid
            IORef (Set ProcessId) -> (Set ProcessId -> Set ProcessId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TimeServer -> IORef (Set ProcessId)
tsProcessesInFind TimeServer
server) ((Set ProcessId -> Set ProcessId) -> IO ())
-> (Set ProcessId -> Set ProcessId) -> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId -> Set ProcessId -> Set ProcessId
forall a. Ord a => a -> Set a -> Set a
S.delete ProcessId
pid
            Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
              do Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServerParams -> Bool) -> TimeServerParams -> Bool
forall a b. (a -> b) -> a -> b
$ TimeServer -> TimeServerParams
tsParams TimeServer
server) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                   ConnectionManager -> ProcessId -> Process ()
removeMessageReceiver (TimeServer -> ConnectionManager
tsConnectionManager TimeServer
server) ProcessId
pid
                 ProcessId
serverId <- Process ProcessId
DP.getSelfPid
                 if TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServer -> TimeServerParams
tsParams TimeServer
server)
                   then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (ProcessId -> InboxProcessMessage
UnregisterLogicalProcessAcknowledgementMessage ProcessId
serverId)
                   else ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (ProcessId -> InboxProcessMessage
UnregisterLogicalProcessAcknowledgementMessage ProcessId
serverId)
                 TimeServer -> Process ()
tryProvideTimeServerGlobalTime TimeServer
server
                 TimeServer -> Process ()
tryTerminateTimeServer TimeServer
server
processTimeServerMessage TimeServer
server (TerminateTimeServerMessage ProcessId
pid) =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Map ProcessId LogicalProcessInfo
m <- IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     case ProcessId
-> Map ProcessId LogicalProcessInfo -> Maybe LogicalProcessInfo
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId LogicalProcessInfo
m of
       Maybe LogicalProcessInfo
Nothing ->
         Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
         TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
         String
"Time Server: unknown process identifier " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
       Just LogicalProcessInfo
x  ->
         do IORef (Map ProcessId LogicalProcessInfo)
-> (Map ProcessId LogicalProcessInfo
    -> Map ProcessId LogicalProcessInfo)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server) ((Map ProcessId LogicalProcessInfo
  -> Map ProcessId LogicalProcessInfo)
 -> IO ())
-> (Map ProcessId LogicalProcessInfo
    -> Map ProcessId LogicalProcessInfo)
-> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId
-> Map ProcessId LogicalProcessInfo
-> Map ProcessId LogicalProcessInfo
forall k a. Ord k => k -> Map k a -> Map k a
M.delete ProcessId
pid
            IORef (Set ProcessId) -> (Set ProcessId -> Set ProcessId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TimeServer -> IORef (Set ProcessId)
tsProcessesInFind TimeServer
server) ((Set ProcessId -> Set ProcessId) -> IO ())
-> (Set ProcessId -> Set ProcessId) -> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId -> Set ProcessId -> Set ProcessId
forall a. Ord a => a -> Set a -> Set a
S.delete ProcessId
pid
            Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
              do Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServerParams -> Bool) -> TimeServerParams -> Bool
forall a b. (a -> b) -> a -> b
$ TimeServer -> TimeServerParams
tsParams TimeServer
server) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                   ConnectionManager -> ProcessId -> Process ()
removeMessageReceiver (TimeServer -> ConnectionManager
tsConnectionManager TimeServer
server) ProcessId
pid
                 ProcessId
serverId <- Process ProcessId
DP.getSelfPid
                 if TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServer -> TimeServerParams
tsParams TimeServer
server)
                   then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (ProcessId -> InboxProcessMessage
TerminateTimeServerAcknowledgementMessage ProcessId
serverId)
                   else ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (ProcessId -> InboxProcessMessage
TerminateTimeServerAcknowledgementMessage ProcessId
serverId)
                 TimeServer -> Process ()
startTerminatingTimeServer TimeServer
server
processTimeServerMessage TimeServer
server (RequestGlobalTimeMessage ProcessId
pid) =
  TimeServer -> Process ()
tryComputeTimeServerGlobalTime TimeServer
server
processTimeServerMessage TimeServer
server (LocalTimeMessage ProcessId
pid Double
t') =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Map ProcessId LogicalProcessInfo
m <- IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     case ProcessId
-> Map ProcessId LogicalProcessInfo -> Maybe LogicalProcessInfo
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId LogicalProcessInfo
m of
       Maybe LogicalProcessInfo
Nothing ->
         Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
         do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
              String
"Time Server: unknown process identifier " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
            TimeServer -> TimeServerMessage -> Process ()
processTimeServerMessage TimeServer
server (ProcessId -> TimeServerMessage
RegisterLogicalProcessMessage ProcessId
pid)
            TimeServer -> TimeServerMessage -> Process ()
processTimeServerMessage TimeServer
server (ProcessId -> Double -> TimeServerMessage
LocalTimeMessage ProcessId
pid Double
t')
       Just LogicalProcessInfo
x  ->
         do UTCTime
utc <- IO UTCTime
getCurrentTime
            IORef (Maybe Double) -> Maybe Double -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (LogicalProcessInfo -> IORef (Maybe Double)
lpLocalTime LogicalProcessInfo
x) (Double -> Maybe Double
forall a. a -> Maybe a
Just Double
t')
            IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (LogicalProcessInfo -> IORef UTCTime
lpTimestamp LogicalProcessInfo
x) UTCTime
utc
            IORef (Set ProcessId) -> (Set ProcessId -> Set ProcessId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TimeServer -> IORef (Set ProcessId)
tsProcessesInFind TimeServer
server) ((Set ProcessId -> Set ProcessId) -> IO ())
-> (Set ProcessId -> Set ProcessId) -> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId -> Set ProcessId -> Set ProcessId
forall a. Ord a => a -> Set a -> Set a
S.delete ProcessId
pid
            Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
              TimeServer -> Process ()
tryProvideTimeServerGlobalTime TimeServer
server
processTimeServerMessage TimeServer
server (ComputeLocalTimeAcknowledgementMessage ProcessId
pid) =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Map ProcessId LogicalProcessInfo
m <- IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     case ProcessId
-> Map ProcessId LogicalProcessInfo -> Maybe LogicalProcessInfo
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId LogicalProcessInfo
m of
       Maybe LogicalProcessInfo
Nothing ->
         Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
         do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
              String
"Time Server: unknown process identifier " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
            TimeServer -> TimeServerMessage -> Process ()
processTimeServerMessage TimeServer
server (ProcessId -> TimeServerMessage
RegisterLogicalProcessMessage ProcessId
pid)
            TimeServer -> TimeServerMessage -> Process ()
processTimeServerMessage TimeServer
server (ProcessId -> TimeServerMessage
ComputeLocalTimeAcknowledgementMessage ProcessId
pid)
       Just LogicalProcessInfo
x  ->
         do UTCTime
utc <- IO UTCTime
getCurrentTime
            IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (LogicalProcessInfo -> IORef UTCTime
lpTimestamp LogicalProcessInfo
x) UTCTime
utc
            Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
              () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
processTimeServerMessage TimeServer
server (ProvideTimeServerStateMessage ProcessId
pid) =
  do let ps :: TimeServerParams
ps   = TimeServer -> TimeServerParams
tsParams TimeServer
server
         name :: String
name = TimeServerParams -> String
tsName TimeServerParams
ps
     ProcessId
serverId <- Process ProcessId
DP.getSelfPid
     Maybe Double
t <- IO (Maybe Double) -> Process (Maybe Double)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Double) -> Process (Maybe Double))
-> IO (Maybe Double) -> Process (Maybe Double)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe Double) -> IO (Maybe Double)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Maybe Double)
tsGlobalTime TimeServer
server)
     Map ProcessId LogicalProcessInfo
m <- IO (Map ProcessId LogicalProcessInfo)
-> Process (Map ProcessId LogicalProcessInfo)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId LogicalProcessInfo)
 -> Process (Map ProcessId LogicalProcessInfo))
-> IO (Map ProcessId LogicalProcessInfo)
-> Process (Map ProcessId LogicalProcessInfo)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     let msg :: TimeServerState
msg = TimeServerState { tsStateId :: ProcessId
tsStateId = ProcessId
serverId,
                                 tsStateName :: String
tsStateName = String
name,
                                 tsStateGlobalVirtualTime :: Maybe Double
tsStateGlobalVirtualTime = Maybe Double
t,
                                 tsStateLogicalProcesses :: [ProcessId]
tsStateLogicalProcesses = Map ProcessId LogicalProcessInfo -> [ProcessId]
forall k a. Map k a -> [k]
M.keys Map ProcessId LogicalProcessInfo
m }
     if TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServer -> TimeServerParams
tsParams TimeServer
server)
       then ProcessId -> TimeServerState -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid TimeServerState
msg
       else ProcessId -> TimeServerState -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid TimeServerState
msg

-- | Whether the both values are defined and the first is greater than or equaled to the second.
(.>=.) :: Maybe Double -> Maybe Double -> Bool
.>=. :: Maybe Double -> Maybe Double -> Bool
(.>=.) (Just Double
x) (Just Double
y) = Double
x Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>= Double
y
(.>=.) Maybe Double
_ Maybe Double
_ = Bool
False

-- | Whether the both values are defined and the first is greater than the second.
(.>.) :: Maybe Double -> Maybe Double -> Bool
.>. :: Maybe Double -> Maybe Double -> Bool
(.>.) (Just Double
x) (Just Double
y) = Double
x Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
y
(.>.) Maybe Double
_ Maybe Double
_ = Bool
False

-- | Try to start synchronizing the global time.
tryStartTimeServer :: TimeServer -> DP.Process ()
tryStartTimeServer :: TimeServer -> Process ()
tryStartTimeServer TimeServer
server =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Bool
f <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef Bool
tsInInit TimeServer
server)
     if Bool -> Bool
not Bool
f
       then Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
            () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       else do Map ProcessId LogicalProcessInfo
m <- IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
               if Map ProcessId LogicalProcessInfo -> Int
forall k a. Map k a -> Int
M.size Map ProcessId LogicalProcessInfo
m Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< TimeServer -> Int
tsInitQuorum TimeServer
server
                 then Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
                      () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                 else do IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TimeServer -> IORef Bool
tsInInit TimeServer
server) Bool
False
                         Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
                           do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
INFO (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
                                String
"Time Server: starting"
                              TimeServer -> Process ()
tryComputeTimeServerGlobalTime TimeServer
server
  
-- | Try to compute the global time and provide the logical processes with it.
tryComputeTimeServerGlobalTime :: TimeServer -> DP.Process ()
tryComputeTimeServerGlobalTime :: TimeServer -> Process ()
tryComputeTimeServerGlobalTime TimeServer
server =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Bool
f <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef Bool
tsInInit TimeServer
server)
     if Bool
f
       then Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
            () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       else do Set ProcessId
s <- IORef (Set ProcessId) -> IO (Set ProcessId)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Set ProcessId)
tsProcessesInFind TimeServer
server)
               if Set ProcessId -> Int
forall a. Set a -> Int
S.size Set ProcessId
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                 then Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
                      () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                 else Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
                      TimeServer -> Process ()
computeTimeServerGlobalTime TimeServer
server

-- | Reset computing the time server global time.
resetComputingTimeServerGlobalTime :: TimeServer -> DP.Process ()
resetComputingTimeServerGlobalTime :: TimeServer -> Process ()
resetComputingTimeServerGlobalTime TimeServer
server =
  do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
       String
"Time Server: reset computing the global time"
     IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
       do UTCTime
utc <- IO UTCTime
getCurrentTime
          IORef (Set ProcessId) -> Set ProcessId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TimeServer -> IORef (Set ProcessId)
tsProcessesInFind TimeServer
server) Set ProcessId
forall a. Set a
S.empty
          IORef (Maybe UTCTime) -> Maybe UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TimeServer -> IORef (Maybe UTCTime)
tsGlobalTimeTimestamp TimeServer
server) (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
utc)

-- | Try to provide the logical processes wth the global time. 
tryProvideTimeServerGlobalTime :: TimeServer -> DP.Process ()
tryProvideTimeServerGlobalTime :: TimeServer -> Process ()
tryProvideTimeServerGlobalTime TimeServer
server =
  Process (Process ()) -> Process ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Process (Process ()) -> Process ())
-> Process (Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ IO (Process ()) -> Process (Process ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Process ()) -> Process (Process ()))
-> IO (Process ()) -> Process (Process ())
forall a b. (a -> b) -> a -> b
$
  do Bool
f <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef Bool
tsInInit TimeServer
server)
     if Bool
f
       then Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
            () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       else do Set ProcessId
s <- IORef (Set ProcessId) -> IO (Set ProcessId)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Set ProcessId)
tsProcessesInFind TimeServer
server)
               if Set ProcessId -> Int
forall a. Set a -> Int
S.size Set ProcessId
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                 then Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
                      () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                 else Process () -> IO (Process ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process () -> IO (Process ())) -> Process () -> IO (Process ())
forall a b. (a -> b) -> a -> b
$
                      TimeServer -> Process ()
provideTimeServerGlobalTime TimeServer
server

-- | Initiate computing the global time.
computeTimeServerGlobalTime :: TimeServer -> DP.Process ()
computeTimeServerGlobalTime :: TimeServer -> Process ()
computeTimeServerGlobalTime TimeServer
server =
  do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
DEBUG (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
       String
"Time Server: computing the global time..."
     [(ProcessId, LogicalProcessInfo)]
zs <- IO [(ProcessId, LogicalProcessInfo)]
-> Process [(ProcessId, LogicalProcessInfo)]
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [(ProcessId, LogicalProcessInfo)]
 -> Process [(ProcessId, LogicalProcessInfo)])
-> IO [(ProcessId, LogicalProcessInfo)]
-> Process [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> a -> b
$ (Map ProcessId LogicalProcessInfo
 -> [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId LogicalProcessInfo
-> [(ProcessId, LogicalProcessInfo)]
forall k a. Map k a -> [(k, a)]
M.assocs (IO (Map ProcessId LogicalProcessInfo)
 -> IO [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     [(ProcessId, LogicalProcessInfo)]
-> ((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(ProcessId, LogicalProcessInfo)]
zs (((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ())
-> ((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \(ProcessId
pid, LogicalProcessInfo
x) ->
       IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
       IORef (Set ProcessId) -> (Set ProcessId -> Set ProcessId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TimeServer -> IORef (Set ProcessId)
tsProcessesInFind TimeServer
server) ((Set ProcessId -> Set ProcessId) -> IO ())
-> (Set ProcessId -> Set ProcessId) -> IO ()
forall a b. (a -> b) -> a -> b
$
       ProcessId -> Set ProcessId -> Set ProcessId
forall a. Ord a => a -> Set a -> Set a
S.insert ProcessId
pid
     [(ProcessId, LogicalProcessInfo)]
-> ((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(ProcessId, LogicalProcessInfo)]
zs (((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ())
-> ((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \(ProcessId
pid, LogicalProcessInfo
x) ->
       if TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServer -> TimeServerParams
tsParams TimeServer
server)
       then ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid LogicalProcessMessage
ComputeLocalTimeMessage
       else ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid LogicalProcessMessage
ComputeLocalTimeMessage

-- | Provide the logical processes with the global time.
provideTimeServerGlobalTime :: TimeServer -> DP.Process ()
provideTimeServerGlobalTime :: TimeServer -> Process ()
provideTimeServerGlobalTime TimeServer
server =
  do Maybe Double
t0 <- IO (Maybe Double) -> Process (Maybe Double)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Double) -> Process (Maybe Double))
-> IO (Maybe Double) -> Process (Maybe Double)
forall a b. (a -> b) -> a -> b
$ TimeServer -> IO (Maybe Double)
timeServerGlobalTime TimeServer
server
     TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
INFO (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
       String
"Time Server: providing the global time = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Maybe Double -> String
forall a. Show a => a -> String
show Maybe Double
t0
     case Maybe Double
t0 of
       Maybe Double
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       Just Double
t0 ->
         do Maybe Double
t' <- IO (Maybe Double) -> Process (Maybe Double)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Double) -> Process (Maybe Double))
-> IO (Maybe Double) -> Process (Maybe Double)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe Double) -> IO (Maybe Double)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Maybe Double)
tsGlobalTime TimeServer
server)
            Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Double
t' Maybe Double -> Maybe Double -> Bool
.>. Double -> Maybe Double
forall a. a -> Maybe a
Just Double
t0) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
              TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
NOTICE
              String
"Time Server: the global time has decreased"
            UTCTime
timestamp <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe Double) -> Maybe Double -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TimeServer -> IORef (Maybe Double)
tsGlobalTime TimeServer
server) (Double -> Maybe Double
forall a. a -> Maybe a
Just Double
t0)
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe UTCTime) -> Maybe UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TimeServer -> IORef (Maybe UTCTime)
tsGlobalTimeTimestamp TimeServer
server) (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
timestamp)
            [(ProcessId, LogicalProcessInfo)]
zs <- IO [(ProcessId, LogicalProcessInfo)]
-> Process [(ProcessId, LogicalProcessInfo)]
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [(ProcessId, LogicalProcessInfo)]
 -> Process [(ProcessId, LogicalProcessInfo)])
-> IO [(ProcessId, LogicalProcessInfo)]
-> Process [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> a -> b
$ (Map ProcessId LogicalProcessInfo
 -> [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId LogicalProcessInfo
-> [(ProcessId, LogicalProcessInfo)]
forall k a. Map k a -> [(k, a)]
M.assocs (IO (Map ProcessId LogicalProcessInfo)
 -> IO [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
            [(ProcessId, LogicalProcessInfo)]
-> ((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(ProcessId, LogicalProcessInfo)]
zs (((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ())
-> ((ProcessId, LogicalProcessInfo) -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \(ProcessId
pid, LogicalProcessInfo
x) ->
              if TimeServerParams -> Bool
tsProcessMonitoringEnabled (TimeServer -> TimeServerParams
tsParams TimeServer
server)
              then ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (Double -> LogicalProcessMessage
GlobalTimeMessage Double
t0)
              else ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (Double -> LogicalProcessMessage
GlobalTimeMessage Double
t0)

-- | Return the time server global time.
timeServerGlobalTime :: TimeServer -> IO (Maybe Double)
timeServerGlobalTime :: TimeServer -> IO (Maybe Double)
timeServerGlobalTime TimeServer
server =
  do [(ProcessId, LogicalProcessInfo)]
zs <- (Map ProcessId LogicalProcessInfo
 -> [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId LogicalProcessInfo
-> [(ProcessId, LogicalProcessInfo)]
forall k a. Map k a -> [(k, a)]
M.assocs (IO (Map ProcessId LogicalProcessInfo)
 -> IO [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     case [(ProcessId, LogicalProcessInfo)]
zs of
       [] -> Maybe Double -> IO (Maybe Double)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Double
forall a. Maybe a
Nothing
       ((ProcessId
pid, LogicalProcessInfo
x) : [(ProcessId, LogicalProcessInfo)]
zs') ->
         do Maybe Double
t <- IORef (Maybe Double) -> IO (Maybe Double)
forall a. IORef a -> IO a
readIORef (LogicalProcessInfo -> IORef (Maybe Double)
lpLocalTime LogicalProcessInfo
x)
            [(ProcessId, LogicalProcessInfo)]
-> Maybe Double -> IO (Maybe Double)
forall {a}.
[(a, LogicalProcessInfo)] -> Maybe Double -> IO (Maybe Double)
loop [(ProcessId, LogicalProcessInfo)]
zs Maybe Double
t
              where loop :: [(a, LogicalProcessInfo)] -> Maybe Double -> IO (Maybe Double)
loop [] Maybe Double
acc = Maybe Double -> IO (Maybe Double)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Double
acc
                    loop ((a
pid, LogicalProcessInfo
x) : [(a, LogicalProcessInfo)]
zs') Maybe Double
acc =
                      do Maybe Double
t <- IORef (Maybe Double) -> IO (Maybe Double)
forall a. IORef a -> IO a
readIORef (LogicalProcessInfo -> IORef (Maybe Double)
lpLocalTime LogicalProcessInfo
x)
                         case Maybe Double
t of
                           Maybe Double
Nothing ->
                             [(a, LogicalProcessInfo)] -> Maybe Double -> IO (Maybe Double)
loop [(a, LogicalProcessInfo)]
zs' Maybe Double
forall a. Maybe a
Nothing
                           Just Double
_  ->
                             [(a, LogicalProcessInfo)] -> Maybe Double -> IO (Maybe Double)
loop [(a, LogicalProcessInfo)]
zs' ((Double -> Double -> Double)
-> Maybe Double -> Maybe Double -> Maybe Double
forall (m :: * -> *) a1 a2 r.
Monad m =>
(a1 -> a2 -> r) -> m a1 -> m a2 -> m r
liftM2 Double -> Double -> Double
forall a. Ord a => a -> a -> a
min Maybe Double
t Maybe Double
acc)

-- | Return a logical process with the minimal timestamp.
minTimestampLogicalProcess :: TimeServer -> IO (Maybe LogicalProcessInfo)
minTimestampLogicalProcess :: TimeServer -> IO (Maybe LogicalProcessInfo)
minTimestampLogicalProcess TimeServer
server =
  do [(ProcessId, LogicalProcessInfo)]
zs <- (Map ProcessId LogicalProcessInfo
 -> [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId LogicalProcessInfo
-> [(ProcessId, LogicalProcessInfo)]
forall k a. Map k a -> [(k, a)]
M.assocs (IO (Map ProcessId LogicalProcessInfo)
 -> IO [(ProcessId, LogicalProcessInfo)])
-> IO (Map ProcessId LogicalProcessInfo)
-> IO [(ProcessId, LogicalProcessInfo)]
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     case [(ProcessId, LogicalProcessInfo)]
zs of
       [] -> Maybe LogicalProcessInfo -> IO (Maybe LogicalProcessInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe LogicalProcessInfo
forall a. Maybe a
Nothing
       ((ProcessId
pid, LogicalProcessInfo
x) : [(ProcessId, LogicalProcessInfo)]
zs') -> [(ProcessId, LogicalProcessInfo)]
-> LogicalProcessInfo -> IO (Maybe LogicalProcessInfo)
forall {a}.
[(a, LogicalProcessInfo)]
-> LogicalProcessInfo -> IO (Maybe LogicalProcessInfo)
loop [(ProcessId, LogicalProcessInfo)]
zs LogicalProcessInfo
x
         where loop :: [(a, LogicalProcessInfo)]
-> LogicalProcessInfo -> IO (Maybe LogicalProcessInfo)
loop [] LogicalProcessInfo
acc = Maybe LogicalProcessInfo -> IO (Maybe LogicalProcessInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LogicalProcessInfo -> Maybe LogicalProcessInfo
forall a. a -> Maybe a
Just LogicalProcessInfo
acc)
               loop ((a
pid, LogicalProcessInfo
x) : [(a, LogicalProcessInfo)]
zs') LogicalProcessInfo
acc =
                 do UTCTime
t0 <- IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef (LogicalProcessInfo -> IORef UTCTime
lpTimestamp LogicalProcessInfo
acc)
                    UTCTime
t  <- IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef (LogicalProcessInfo -> IORef UTCTime
lpTimestamp LogicalProcessInfo
x)
                    if UTCTime
t0 UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UTCTime
t
                      then [(a, LogicalProcessInfo)]
-> LogicalProcessInfo -> IO (Maybe LogicalProcessInfo)
loop [(a, LogicalProcessInfo)]
zs' LogicalProcessInfo
acc
                      else [(a, LogicalProcessInfo)]
-> LogicalProcessInfo -> IO (Maybe LogicalProcessInfo)
loop [(a, LogicalProcessInfo)]
zs' LogicalProcessInfo
x

-- | Filter the logical processes.
filterLogicalProcesses :: TimeServer -> [DP.ProcessId] -> IO [DP.ProcessId]
filterLogicalProcesses :: TimeServer -> [ProcessId] -> IO [ProcessId]
filterLogicalProcesses TimeServer
server [ProcessId]
pids =
  do Map ProcessId LogicalProcessInfo
xs <- IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
     [ProcessId] -> IO [ProcessId]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ProcessId] -> IO [ProcessId]) -> [ProcessId] -> IO [ProcessId]
forall a b. (a -> b) -> a -> b
$ (ProcessId -> Bool) -> [ProcessId] -> [ProcessId]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ProcessId
pid -> ProcessId -> Map ProcessId LogicalProcessInfo -> Bool
forall k a. Ord k => k -> Map k a -> Bool
M.member ProcessId
pid Map ProcessId LogicalProcessInfo
xs) [ProcessId]
pids

-- | Start terminating the time server.
startTerminatingTimeServer :: TimeServer -> DP.Process ()
startTerminatingTimeServer :: TimeServer -> Process ()
startTerminatingTimeServer TimeServer
server =
  do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
INFO String
"Time Server: start terminating..."
     IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
       IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TimeServer -> IORef Bool
tsTerminating TimeServer
server) Bool
True
     TimeServer -> Process ()
tryTerminateTimeServer TimeServer
server

-- | Try to terminate the time server.
tryTerminateTimeServer :: TimeServer -> DP.Process ()
tryTerminateTimeServer :: TimeServer -> Process ()
tryTerminateTimeServer TimeServer
server =
  do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef Bool
tsTerminating TimeServer
server)
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
       do Map ProcessId LogicalProcessInfo
m <- IO (Map ProcessId LogicalProcessInfo)
-> Process (Map ProcessId LogicalProcessInfo)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId LogicalProcessInfo)
 -> Process (Map ProcessId LogicalProcessInfo))
-> IO (Map ProcessId LogicalProcessInfo)
-> Process (Map ProcessId LogicalProcessInfo)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId LogicalProcessInfo)
-> IO (Map ProcessId LogicalProcessInfo)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Map ProcessId LogicalProcessInfo)
tsProcesses TimeServer
server)
          Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Map ProcessId LogicalProcessInfo -> Bool
forall k a. Map k a -> Bool
M.null Map ProcessId LogicalProcessInfo
m) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
            do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
INFO String
"Time Server: terminate"
               Process ()
forall a. Process a
DP.terminate

-- | Convert seconds to microseconds.
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds Double
x = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Integer
forall a. Integral a => a -> Integer
toInteger (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Double -> Integer
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
1000000 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
x)

-- | The internal time server message.
data InternalTimeServerMessage = InternalTimeServerMessage TimeServerMessage
                                 -- ^ the time server message
                               | InternalProcessMonitorNotification DP.ProcessMonitorNotification
                                 -- ^ the process monitor notification
                               | InternalGeneralMessage GeneralMessage
                                 -- ^ the general message

-- | Handle the time server exception
handleTimeServerException :: TimeServer -> SomeException -> DP.Process ()
handleTimeServerException :: TimeServer -> SomeException -> Process ()
handleTimeServerException TimeServer
server SomeException
e =
  do ---
     TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
ERROR (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Exception occurred: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
e
     ---
     SomeException -> Process ()
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
C.throwM SomeException
e

-- | Start the time server by the specified initial quorum and parameters.
-- The quorum defines the number of logical processes that must be registered in
-- the time server before the global time synchronization is started.
timeServer :: Int -> TimeServerParams -> DP.Process ()
timeServer :: Int -> TimeServerParams -> Process ()
timeServer Int
n TimeServerParams
ps = Int -> TimeServerParams -> TimeServerEnv -> Process ()
timeServerWithEnv Int
n TimeServerParams
ps TimeServerEnv
defaultTimeServerEnv

-- | A full version of 'timeServer' that allows specifying the environment parameters.
timeServerWithEnv :: Int -> TimeServerParams -> TimeServerEnv -> DP.Process ()
timeServerWithEnv :: Int -> TimeServerParams -> TimeServerEnv -> Process ()
timeServerWithEnv Int
n TimeServerParams
ps TimeServerEnv
env =
  do TimeServer
server <- IO TimeServer -> Process TimeServer
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TimeServer -> Process TimeServer)
-> IO TimeServer -> Process TimeServer
forall a b. (a -> b) -> a -> b
$ Int -> TimeServerParams -> IO TimeServer
newTimeServer Int
n TimeServerParams
ps
     ProcessId
serverId <- Process ProcessId
DP.getSelfPid
     TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
INFO String
"Time Server: starting..."
     let loop :: UTCTime -> Process b
loop UTCTime
utc0 =
           do let f1 :: TimeServerMessage -> DP.Process InternalTimeServerMessage
                  f1 :: TimeServerMessage -> Process InternalTimeServerMessage
f1 TimeServerMessage
x = InternalTimeServerMessage -> Process InternalTimeServerMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (TimeServerMessage -> InternalTimeServerMessage
InternalTimeServerMessage TimeServerMessage
x)
                  f2 :: DP.ProcessMonitorNotification -> DP.Process InternalTimeServerMessage
                  f2 :: ProcessMonitorNotification -> Process InternalTimeServerMessage
f2 ProcessMonitorNotification
x = InternalTimeServerMessage -> Process InternalTimeServerMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessMonitorNotification -> InternalTimeServerMessage
InternalProcessMonitorNotification ProcessMonitorNotification
x)
                  f3 :: GeneralMessage -> DP.Process InternalTimeServerMessage
                  f3 :: GeneralMessage -> Process InternalTimeServerMessage
f3 GeneralMessage
x = InternalTimeServerMessage -> Process InternalTimeServerMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (GeneralMessage -> InternalTimeServerMessage
InternalGeneralMessage GeneralMessage
x)
              Maybe InternalTimeServerMessage
a <- Int
-> [Match InternalTimeServerMessage]
-> Process (Maybe InternalTimeServerMessage)
forall b. Int -> [Match b] -> Process (Maybe b)
DP.receiveTimeout (TimeServerParams -> Int
tsReceiveTimeout TimeServerParams
ps) [(TimeServerMessage -> Process InternalTimeServerMessage)
-> Match InternalTimeServerMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match TimeServerMessage -> Process InternalTimeServerMessage
f1, (ProcessMonitorNotification -> Process InternalTimeServerMessage)
-> Match InternalTimeServerMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match ProcessMonitorNotification -> Process InternalTimeServerMessage
f2, (GeneralMessage -> Process InternalTimeServerMessage)
-> Match InternalTimeServerMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match GeneralMessage -> Process InternalTimeServerMessage
f3]
              case Maybe InternalTimeServerMessage
a of
                Maybe InternalTimeServerMessage
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just (InternalTimeServerMessage TimeServerMessage
m) ->
                  do ---
                     TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
DEBUG (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
                       String
"Time Server: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ TimeServerMessage -> String
forall a. Show a => a -> String
show TimeServerMessage
m
                     ---
                     TimeServer -> TimeServerMessage -> Process ()
processTimeServerMessage TimeServer
server TimeServerMessage
m
                Just (InternalProcessMonitorNotification ProcessMonitorNotification
m) ->
                  ProcessMonitorNotification -> TimeServer -> Process ()
handleProcessMonitorNotification ProcessMonitorNotification
m TimeServer
server
                Just (InternalGeneralMessage GeneralMessage
m) ->
                  GeneralMessage -> TimeServer -> Process ()
handleGeneralMessage GeneralMessage
m TimeServer
server
              UTCTime
utc <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
              UTCTime
validation <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UTCTime -> Process UTCTime) -> IO UTCTime -> Process UTCTime
forall a b. (a -> b) -> a -> b
$ IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef UTCTime
tsLogicalProcessValidationTimestamp TimeServer
server)
              Maybe UTCTime
timestamp <- IO (Maybe UTCTime) -> Process (Maybe UTCTime)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe UTCTime) -> Process (Maybe UTCTime))
-> IO (Maybe UTCTime) -> Process (Maybe UTCTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe UTCTime) -> IO (Maybe UTCTime)
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef (Maybe UTCTime)
tsGlobalTimeTimestamp TimeServer
server)
              Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TimeServer -> UTCTime -> UTCTime -> Bool
timeSyncTimeoutExceeded TimeServer
server UTCTime
validation UTCTime
utc) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                TimeServer -> UTCTime -> Process ()
validateLogicalProcesses TimeServer
server UTCTime
utc
              case Maybe UTCTime
timestamp of
                Just UTCTime
x | TimeServer -> UTCTime -> UTCTime -> Bool
timeSyncTimeoutExceeded TimeServer
server UTCTime
x UTCTime
utc ->
                  TimeServer -> Process ()
resetComputingTimeServerGlobalTime TimeServer
server
                Maybe UTCTime
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
              if TimeServer -> UTCTime -> UTCTime -> Bool
timeSyncDelayExceeded TimeServer
server UTCTime
utc0 UTCTime
utc
                then do TimeServer -> Process ()
tryComputeTimeServerGlobalTime TimeServer
server
                        UTCTime -> Process b
loop UTCTime
utc
                else UTCTime -> Process b
loop UTCTime
utc0
         loop' :: UTCTime -> Process a
loop' UTCTime
utc0 =
           Process a -> Process () -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
C.finally
           (UTCTime -> Process a
forall {b}. UTCTime -> Process b
loop UTCTime
utc0)
           (do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                 IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef (TimeServer -> IORef Bool
tsTerminated TimeServer
server) Bool
True
               ConnectionManager -> Process ()
clearMessageReceivers (TimeServer -> ConnectionManager
tsConnectionManager TimeServer
server))
     case TimeServerEnv -> Maybe (TimeServerState -> Process ())
tsSimulationMonitoringAction TimeServerEnv
env of
       Maybe (TimeServerState -> Process ())
Nothing  -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       Just TimeServerState -> Process ()
act ->
         do ProcessId
monitorId <-
              Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
              let loop :: Process ()
loop =
                    do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef Bool
tsTerminated TimeServer
server)
                       Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                         do Maybe TimeServerState
x <- Int -> Process (Maybe TimeServerState)
forall a. Serializable a => Int -> Process (Maybe a)
DP.expectTimeout (TimeServerParams -> Int
tsSimulationMonitoringTimeout TimeServerParams
ps)
                            case Maybe TimeServerState
x of
                              Maybe TimeServerState
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                              Just TimeServerState
st -> TimeServerState -> Process ()
act TimeServerState
st
                            Process ()
loop
              in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (TimeServer -> SomeException -> Process ()
handleTimeServerException TimeServer
server)
            Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
              let loop :: Process ()
loop =
                    do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (TimeServer -> IORef Bool
tsTerminated TimeServer
server)
                       Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                         do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                              Int -> IO ()
threadDelay (TimeServerParams -> Int
tsSimulationMonitoringInterval TimeServerParams
ps)
                            ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
serverId (ProcessId -> TimeServerMessage
ProvideTimeServerStateMessage ProcessId
monitorId)
                            Process ()
loop
              in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (TimeServer -> SomeException -> Process ()
handleTimeServerException TimeServer
server)
            () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
     Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch (IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime Process UTCTime -> (UTCTime -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UTCTime -> Process ()
forall {b}. UTCTime -> Process b
loop') (TimeServer -> SomeException -> Process ()
handleTimeServerException TimeServer
server) 

-- | Handle the process monitor notification.
handleProcessMonitorNotification :: DP.ProcessMonitorNotification -> TimeServer -> DP.Process ()
handleProcessMonitorNotification :: ProcessMonitorNotification -> TimeServer -> Process ()
handleProcessMonitorNotification m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
pid0 DiedReason
reason) TimeServer
server =
  do let ps :: TimeServerParams
ps = TimeServer -> TimeServerParams
tsParams TimeServer
server
         recv :: ProcessMonitorNotification -> Process ProcessMonitorNotification
recv m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_) = 
           do ---
              TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
                String
"Time Server: received a process monitor notification " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessMonitorNotification -> String
forall a. Show a => a -> String
show ProcessMonitorNotification
m
              ---
              ProcessMonitorNotification -> Process ProcessMonitorNotification
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessMonitorNotification
m
     ProcessMonitorNotification -> Process ProcessMonitorNotification
recv ProcessMonitorNotification
m
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TimeServerParams -> Bool
tsProcessReconnectingEnabled TimeServerParams
ps Bool -> Bool -> Bool
&& DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DP.DiedDisconnect) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
       do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
            Int -> IO ()
threadDelay (TimeServerParams -> Int
tsProcessReconnectingDelay TimeServerParams
ps)
          let pred :: ProcessMonitorNotification -> Bool
pred m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
reason) = DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DP.DiedDisconnect
              loop :: [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessMonitorNotification]
              loop :: [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop [ProcessMonitorNotification]
acc =
                do Maybe ProcessMonitorNotification
y <- Int
-> [Match ProcessMonitorNotification]
-> Process (Maybe ProcessMonitorNotification)
forall b. Int -> [Match b] -> Process (Maybe b)
DP.receiveTimeout Int
0 [(ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification
    -> Process ProcessMonitorNotification)
-> Match ProcessMonitorNotification
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
DP.matchIf ProcessMonitorNotification -> Bool
pred ProcessMonitorNotification -> Process ProcessMonitorNotification
recv]
                   case Maybe ProcessMonitorNotification
y of
                     Maybe ProcessMonitorNotification
Nothing -> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ProcessMonitorNotification]
 -> Process [ProcessMonitorNotification])
-> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
forall a b. (a -> b) -> a -> b
$ [ProcessMonitorNotification] -> [ProcessMonitorNotification]
forall a. [a] -> [a]
reverse [ProcessMonitorNotification]
acc
                     Just m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_) -> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop (ProcessMonitorNotification
m ProcessMonitorNotification
-> [ProcessMonitorNotification] -> [ProcessMonitorNotification]
forall a. a -> [a] -> [a]
: [ProcessMonitorNotification]
acc)
          [ProcessMonitorNotification]
ms <- [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop [ProcessMonitorNotification
m]
          [ProcessId]
pids <- ConnectionManager
-> [ProcessMonitorNotification] -> Process [ProcessId]
filterMessageReceivers (TimeServer -> ConnectionManager
tsConnectionManager TimeServer
server) [ProcessMonitorNotification]
ms Process [ProcessId]
-> ([ProcessId] -> Process [ProcessId]) -> Process [ProcessId]
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                  (IO [ProcessId] -> Process [ProcessId]
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ProcessId] -> Process [ProcessId])
-> ([ProcessId] -> IO [ProcessId])
-> [ProcessId]
-> Process [ProcessId]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeServer -> [ProcessId] -> IO [ProcessId]
filterLogicalProcesses TimeServer
server)
          ConnectionManager -> [ProcessId] -> Process ()
reconnectMessageReceivers (TimeServer -> ConnectionManager
tsConnectionManager TimeServer
server) [ProcessId]
pids
          TimeServer -> Process ()
resetComputingTimeServerGlobalTime TimeServer
server
          TimeServer -> Process ()
tryComputeTimeServerGlobalTime TimeServer
server

-- | Handle the general message.
handleGeneralMessage :: GeneralMessage -> TimeServer -> DP.Process ()
handleGeneralMessage :: GeneralMessage -> TimeServer -> Process ()
handleGeneralMessage m :: GeneralMessage
m@GeneralMessage
KeepAliveMessage TimeServer
server =
  do ---
     TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
DEBUG (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
       String
"Time Server: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ GeneralMessage -> String
forall a. Show a => a -> String
show GeneralMessage
m
     ---
     () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Test whether the sychronization delay has been exceeded.
timeSyncDelayExceeded :: TimeServer -> UTCTime -> UTCTime -> Bool
timeSyncDelayExceeded :: TimeServer -> UTCTime -> UTCTime -> Bool
timeSyncDelayExceeded TimeServer
server UTCTime
utc0 UTCTime
utc =
  let dt :: Double
dt = Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
utc UTCTime
utc0)
  in Double -> Int
secondsToMicroseconds Double
dt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> (TimeServerParams -> Int
tsTimeSyncDelay (TimeServerParams -> Int) -> TimeServerParams -> Int
forall a b. (a -> b) -> a -> b
$ TimeServer -> TimeServerParams
tsParams TimeServer
server)

-- | Test whether the synchronization timeout has been exceeded.
timeSyncTimeoutExceeded :: TimeServer -> UTCTime -> UTCTime -> Bool
timeSyncTimeoutExceeded :: TimeServer -> UTCTime -> UTCTime -> Bool
timeSyncTimeoutExceeded TimeServer
server UTCTime
utc0 UTCTime
utc =
  let dt :: Double
dt = Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
utc UTCTime
utc0)
  in Double -> Int
secondsToMicroseconds Double
dt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> (TimeServerParams -> Int
tsTimeSyncTimeout (TimeServerParams -> Int) -> TimeServerParams -> Int
forall a b. (a -> b) -> a -> b
$ TimeServer -> TimeServerParams
tsParams TimeServer
server)

-- | Get the difference between the specified time and the logical process timestamp.
diffLogicalProcessTimestamp :: UTCTime -> LogicalProcessInfo -> IO Int
diffLogicalProcessTimestamp :: UTCTime -> LogicalProcessInfo -> IO Int
diffLogicalProcessTimestamp UTCTime
utc LogicalProcessInfo
lp =
  do UTCTime
utc0 <- IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef (LogicalProcessInfo -> IORef UTCTime
lpTimestamp LogicalProcessInfo
lp)
     let dt :: Double
dt = Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
utc UTCTime
utc0)
     Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Double -> Int
secondsToMicroseconds Double
dt

-- | Validate the logical processes.
validateLogicalProcesses :: TimeServer -> UTCTime -> DP.Process ()
validateLogicalProcesses :: TimeServer -> UTCTime -> Process ()
validateLogicalProcesses TimeServer
server UTCTime
utc =
  do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
       String
"Time Server: validating the logical processes"
     IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
       IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TimeServer -> IORef UTCTime
tsLogicalProcessValidationTimestamp TimeServer
server) UTCTime
utc
     case TimeServerParams -> TimeServerStrategy
tsStrategy (TimeServer -> TimeServerParams
tsParams TimeServer
server) of
       TimeServerStrategy
WaitIndefinitelyForLogicalProcess ->
         () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       TerminateDueToLogicalProcessTimeout Int
timeout ->
         do Maybe LogicalProcessInfo
x <- IO (Maybe LogicalProcessInfo) -> Process (Maybe LogicalProcessInfo)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe LogicalProcessInfo)
 -> Process (Maybe LogicalProcessInfo))
-> IO (Maybe LogicalProcessInfo)
-> Process (Maybe LogicalProcessInfo)
forall a b. (a -> b) -> a -> b
$ TimeServer -> IO (Maybe LogicalProcessInfo)
minTimestampLogicalProcess TimeServer
server
            case Maybe LogicalProcessInfo
x of
              Just LogicalProcessInfo
lp ->
                do Int
diff <- IO Int -> Process Int
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> Process Int) -> IO Int -> Process Int
forall a b. (a -> b) -> a -> b
$ UTCTime -> LogicalProcessInfo -> IO Int
diffLogicalProcessTimestamp UTCTime
utc LogicalProcessInfo
lp
                   Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
diff Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
timeout) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                     do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
                          String
"Time Server: terminating due to the exceeded logical process timeout"
                        Process ()
forall a. Process a
DP.terminate
              Maybe LogicalProcessInfo
Nothing ->
                () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       UnregisterLogicalProcessDueToTimeout Int
timeout ->
         do Maybe LogicalProcessInfo
x <- IO (Maybe LogicalProcessInfo) -> Process (Maybe LogicalProcessInfo)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe LogicalProcessInfo)
 -> Process (Maybe LogicalProcessInfo))
-> IO (Maybe LogicalProcessInfo)
-> Process (Maybe LogicalProcessInfo)
forall a b. (a -> b) -> a -> b
$ TimeServer -> IO (Maybe LogicalProcessInfo)
minTimestampLogicalProcess TimeServer
server
            case Maybe LogicalProcessInfo
x of
              Just LogicalProcessInfo
lp ->
                do Int
diff <- IO Int -> Process Int
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> Process Int) -> IO Int -> Process Int
forall a b. (a -> b) -> a -> b
$ UTCTime -> LogicalProcessInfo -> IO Int
diffLogicalProcessTimestamp UTCTime
utc LogicalProcessInfo
lp
                   Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
diff Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
timeout) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                     do TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
                          String
"Time Server: unregistering the logical process due to the exceeded timeout"
                        TimeServer -> TimeServerMessage -> Process ()
processTimeServerMessage TimeServer
server (ProcessId -> TimeServerMessage
UnregisterLogicalProcessMessage (ProcessId -> TimeServerMessage) -> ProcessId -> TimeServerMessage
forall a b. (a -> b) -> a -> b
$ LogicalProcessInfo -> ProcessId
lpId LogicalProcessInfo
lp)
              Maybe LogicalProcessInfo
Nothing ->
                () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | A curried version of 'timeServer' for starting the time server on remote node.
curryTimeServer :: (Int, TimeServerParams) -> DP.Process ()
curryTimeServer :: (Int, TimeServerParams) -> Process ()
curryTimeServer (Int
n, TimeServerParams
ps) = Int -> TimeServerParams -> Process ()
timeServer Int
n TimeServerParams
ps

-- | Log the message with the specified priority.
logTimeServer :: TimeServer -> Priority -> String -> DP.Process ()
{-# INLINE logTimeServer #-}
logTimeServer :: TimeServer -> Priority -> String -> Process ()
logTimeServer TimeServer
server Priority
p String
message =
  Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TimeServerParams -> Priority
tsLoggingPriority (TimeServer -> TimeServerParams
tsParams TimeServer
server) Priority -> Priority -> Bool
forall a. Ord a => a -> a -> Bool
<= Priority
p) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
  String -> Process ()
DP.say (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
  Priority -> String
embracePriority Priority
p String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
message