module Simulation.Aivika.Distributed.Optimistic.Internal.DIO
(DIO(..),
DIOParams(..),
invokeDIO,
runDIO,
defaultDIOParams,
terminateDIO,
unregisterDIO,
dioParams,
messageChannel,
messageInboxId,
timeServerId,
logDIO,
liftDistributedUnsafe) where
import Data.Typeable
import Data.Binary
import GHC.Generics
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Control.Exception (throw)
import Control.Monad.Catch as C
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
data DIOParams =
DIOParams { dioLoggingPriority :: Priority,
dioUndoableLogSizeThreshold :: Int,
dioOutputMessageQueueSizeThreshold :: Int,
dioSyncTimeout :: Int,
dioAllowPrematureIO :: Bool,
dioAllowProcessingOutdatedMessage :: Bool
} deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary DIOParams
newtype DIO a = DIO { unDIO :: DIOContext -> DP.Process a
}
data DIOContext =
DIOContext { dioChannel :: Channel LocalProcessMessage,
dioInboxId :: DP.ProcessId,
dioTimeServerId :: DP.ProcessId,
dioParams0 :: DIOParams
}
instance Monad DIO where
return = DIO . const . return
(DIO m) >>= k = DIO $ \ps ->
m ps >>= \a ->
let m' = unDIO (k a) in m' ps
instance Applicative DIO where
pure = return
(<*>) = ap
instance Functor DIO where
fmap f (DIO m) = DIO $ fmap f . m
instance MonadException DIO where
catchComp (DIO m) h = DIO $ \ps ->
C.catch (m ps) (\e -> unDIO (h e) ps)
finallyComp (DIO m1) (DIO m2) = DIO $ \ps ->
C.finally (m1 ps) (m2 ps)
throwComp e = DIO $ \ps ->
throw e
invokeDIO :: DIOContext -> DIO a -> DP.Process a
invokeDIO ps (DIO m) = m ps
liftDistributedUnsafe :: DP.Process a -> DIO a
liftDistributedUnsafe = DIO . const
defaultDIOParams :: DIOParams
defaultDIOParams =
DIOParams { dioLoggingPriority = DEBUG,
dioUndoableLogSizeThreshold = 500000,
dioOutputMessageQueueSizeThreshold = 10000,
dioSyncTimeout = 5000000,
dioAllowPrematureIO = False,
dioAllowProcessingOutdatedMessage = False
}
dioParams :: DIO DIOParams
dioParams = DIO $ return . dioParams0
messageChannel :: DIO (Channel LocalProcessMessage)
messageChannel = DIO $ return . dioChannel
messageInboxId :: DIO DP.ProcessId
messageInboxId = DIO $ return . dioInboxId
timeServerId :: DIO DP.ProcessId
timeServerId = DIO $ return . dioTimeServerId
terminateDIO :: DIO ()
terminateDIO =
do logDIO INFO "Terminating the simulation..."
sender <- messageInboxId
receiver <- timeServerId
liftDistributedUnsafe $
DP.send receiver (TerminateTimeServerMessage sender)
unregisterDIO :: DIO ()
unregisterDIO =
do logDIO INFO "Unregistering the simulation process..."
sender <- messageInboxId
receiver <- timeServerId
liftDistributedUnsafe $
DP.send receiver (UnregisterLocalProcessMessage sender)
runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIO m ps serverId =
do ch <- liftIO newChannel
inboxId <-
DP.spawnLocal $
forever $
do m <- DP.expect :: DP.Process LocalProcessMessage
liftIO $
writeChannel ch m
when (m == TerminateLocalProcessMessage) $
do
logProcess ps INFO "Terminating the inbox process..."
DP.terminate
logProcess ps INFO "Registering the simulation process..."
DP.send serverId (RegisterLocalProcessMessage inboxId)
return (inboxId, unDIO m DIOContext { dioChannel = ch,
dioInboxId = inboxId,
dioTimeServerId = serverId,
dioParams0 = ps })
logDIO :: Priority -> String -> DIO ()
logDIO p message =
do ps <- dioParams
when (dioLoggingPriority ps <= p) $
liftDistributedUnsafe $
DP.say $
embracePriority p ++ " " ++ message
logProcess :: DIOParams -> Priority -> String -> DP.Process ()
logProcess ps p message =
when (dioLoggingPriority ps <= p) $
DP.say $
embracePriority p ++ " " ++ message