module Simulation.Aivika.Server
(
Server,
newServer,
newStateServer,
newPreemptibleServer,
newPreemptibleStateServer,
serverProcessor,
serverInitState,
serverState,
serverTotalInputWaitTime,
serverTotalProcessingTime,
serverTotalOutputWaitTime,
serverTotalPreemptionTime,
serverInputWaitTime,
serverProcessingTime,
serverOutputWaitTime,
serverPreemptionTime,
serverInputWaitFactor,
serverProcessingFactor,
serverOutputWaitFactor,
serverPreemptionFactor,
resetServer,
serverSummary,
serverStateChanged,
serverStateChanged_,
serverTotalInputWaitTimeChanged,
serverTotalInputWaitTimeChanged_,
serverTotalProcessingTimeChanged,
serverTotalProcessingTimeChanged_,
serverTotalOutputWaitTimeChanged,
serverTotalOutputWaitTimeChanged_,
serverTotalPreemptionTimeChanged,
serverTotalPreemptionTimeChanged_,
serverInputWaitTimeChanged,
serverInputWaitTimeChanged_,
serverProcessingTimeChanged,
serverProcessingTimeChanged_,
serverOutputWaitTimeChanged,
serverOutputWaitTimeChanged_,
serverPreemptionTimeChanged,
serverPreemptionTimeChanged_,
serverInputWaitFactorChanged,
serverInputWaitFactorChanged_,
serverProcessingFactorChanged,
serverProcessingFactorChanged_,
serverOutputWaitFactorChanged,
serverOutputWaitFactorChanged_,
serverPreemptionFactorChanged,
serverPreemptionFactorChanged_,
serverInputReceived,
serverTaskPreemptionBeginning,
serverTaskPreemptionEnding,
serverTaskProcessed,
serverOutputProvided,
serverChanged_) where
import Data.IORef
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Control.Arrow
import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Signal
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Processor
import Simulation.Aivika.Stream
import Simulation.Aivika.Statistics
data Server s a b =
Server { serverInitState :: s,
serverStateRef :: IORef s,
serverProcess :: s -> a -> Process (s, b),
serverProcessPreemptible :: Bool,
serverTotalInputWaitTimeRef :: IORef Double,
serverTotalProcessingTimeRef :: IORef Double,
serverTotalOutputWaitTimeRef :: IORef Double,
serverTotalPreemptionTimeRef :: IORef Double,
serverInputWaitTimeRef :: IORef (SamplingStats Double),
serverProcessingTimeRef :: IORef (SamplingStats Double),
serverOutputWaitTimeRef :: IORef (SamplingStats Double),
serverPreemptionTimeRef :: IORef (SamplingStats Double),
serverInputReceivedSource :: SignalSource a,
serverTaskPreemptionBeginningSource :: SignalSource a,
serverTaskPreemptionEndingSource :: SignalSource a,
serverTaskProcessedSource :: SignalSource (a, b),
serverOutputProvidedSource :: SignalSource (a, b)
}
newServer :: (a -> Process b)
-> Simulation (Server () a b)
newServer = newPreemptibleServer False
newStateServer :: (s -> a -> Process (s, b))
-> s
-> Simulation (Server s a b)
newStateServer = newPreemptibleStateServer False
newPreemptibleServer :: Bool
-> (a -> Process b)
-> Simulation (Server () a b)
newPreemptibleServer preemptible provide =
flip (newPreemptibleStateServer preemptible) () $ \s a ->
do b <- provide a
return (s, b)
newPreemptibleStateServer :: Bool
-> (s -> a -> Process (s, b))
-> s
-> Simulation (Server s a b)
newPreemptibleStateServer preemptible provide state =
do r0 <- liftIO $ newIORef state
r1 <- liftIO $ newIORef 0
r2 <- liftIO $ newIORef 0
r3 <- liftIO $ newIORef 0
r4 <- liftIO $ newIORef 0
r5 <- liftIO $ newIORef emptySamplingStats
r6 <- liftIO $ newIORef emptySamplingStats
r7 <- liftIO $ newIORef emptySamplingStats
r8 <- liftIO $ newIORef emptySamplingStats
s1 <- newSignalSource
s2 <- newSignalSource
s3 <- newSignalSource
s4 <- newSignalSource
s5 <- newSignalSource
let server = Server { serverInitState = state,
serverStateRef = r0,
serverProcess = provide,
serverProcessPreemptible = preemptible,
serverTotalInputWaitTimeRef = r1,
serverTotalProcessingTimeRef = r2,
serverTotalOutputWaitTimeRef = r3,
serverTotalPreemptionTimeRef = r4,
serverInputWaitTimeRef = r5,
serverProcessingTimeRef = r6,
serverOutputWaitTimeRef = r7,
serverPreemptionTimeRef = r8,
serverInputReceivedSource = s1,
serverTaskPreemptionBeginningSource = s2,
serverTaskPreemptionEndingSource = s3,
serverTaskProcessedSource = s4,
serverOutputProvidedSource = s5 }
return server
serverProcessor :: Server s a b -> Processor a b
serverProcessor server =
Processor $ \xs -> loop (serverInitState server) Nothing xs
where
loop s r xs =
Cons $
do t0 <- liftDynamics time
liftEvent $
case r of
Nothing -> return ()
Just (t', a', b') ->
do liftIO $
do modifyIORef' (serverTotalOutputWaitTimeRef server) (+ (t0 - t'))
modifyIORef' (serverOutputWaitTimeRef server) $
addSamplingStats (t0 - t')
triggerSignal (serverOutputProvidedSource server) (a', b')
(a, xs') <- runStream xs
t1 <- liftDynamics time
liftEvent $
do liftIO $
do modifyIORef' (serverTotalInputWaitTimeRef server) (+ (t1 - t0))
modifyIORef' (serverInputWaitTimeRef server) $
addSamplingStats (t1 - t0)
triggerSignal (serverInputReceivedSource server) a
(s', b, dt) <-
if serverProcessPreemptible server
then serverProcessPreempting server s a
else do (s', b) <- serverProcess server s a
return (s', b, 0)
t2 <- liftDynamics time
liftEvent $
do liftIO $
do writeIORef (serverStateRef server) $! s'
modifyIORef' (serverTotalProcessingTimeRef server) (+ (t2 - t1 - dt))
modifyIORef' (serverProcessingTimeRef server) $
addSamplingStats (t2 - t1 - dt)
triggerSignal (serverTaskProcessedSource server) (a, b)
return (b, loop s' (Just (t2, a, b)) xs')
serverProcessPreempting :: Server s a b -> s -> a -> Process (s, b, Double)
serverProcessPreempting server s a =
do pid <- processId
t1 <- liftDynamics time
rs <- liftIO $ newIORef 0
r1 <- liftIO $ newIORef t1
h1 <- liftEvent $
handleSignal (processPreemptionBeginning pid) $ \() ->
do t1 <- liftDynamics time
liftIO $ writeIORef r1 t1
triggerSignal (serverTaskPreemptionBeginningSource server) a
h2 <- liftEvent $
handleSignal (processPreemptionEnding pid) $ \() ->
do t1 <- liftIO $ readIORef r1
t2 <- liftDynamics time
let dt = t2 - t1
liftIO $
do modifyIORef' rs (+ dt)
modifyIORef' (serverTotalPreemptionTimeRef server) (+ dt)
modifyIORef' (serverPreemptionTimeRef server) $
addSamplingStats dt
triggerSignal (serverTaskPreemptionEndingSource server) a
let m1 =
do (s', b) <- serverProcess server s a
dt <- liftIO $ readIORef rs
return (s', b, dt)
m2 =
liftEvent $
do disposeEvent h1
disposeEvent h2
finallyProcess m1 m2
serverState :: Server s a b -> Event s
serverState server =
Event $ \p -> readIORef (serverStateRef server)
serverStateChanged :: Server s a b -> Signal s
serverStateChanged server =
mapSignalM (const $ serverState server) (serverStateChanged_ server)
serverStateChanged_ :: Server s a b -> Signal ()
serverStateChanged_ server =
mapSignal (const ()) (serverTaskProcessed server)
serverTotalInputWaitTime :: Server s a b -> Event Double
serverTotalInputWaitTime server =
Event $ \p -> readIORef (serverTotalInputWaitTimeRef server)
serverTotalInputWaitTimeChanged :: Server s a b -> Signal Double
serverTotalInputWaitTimeChanged server =
mapSignalM (const $ serverTotalInputWaitTime server) (serverTotalInputWaitTimeChanged_ server)
serverTotalInputWaitTimeChanged_ :: Server s a b -> Signal ()
serverTotalInputWaitTimeChanged_ server =
mapSignal (const ()) (serverInputReceived server)
serverTotalProcessingTime :: Server s a b -> Event Double
serverTotalProcessingTime server =
Event $ \p -> readIORef (serverTotalProcessingTimeRef server)
serverTotalProcessingTimeChanged :: Server s a b -> Signal Double
serverTotalProcessingTimeChanged server =
mapSignalM (const $ serverTotalProcessingTime server) (serverTotalProcessingTimeChanged_ server)
serverTotalProcessingTimeChanged_ :: Server s a b -> Signal ()
serverTotalProcessingTimeChanged_ server =
mapSignal (const ()) (serverTaskProcessed server)
serverTotalOutputWaitTime :: Server s a b -> Event Double
serverTotalOutputWaitTime server =
Event $ \p -> readIORef (serverTotalOutputWaitTimeRef server)
serverTotalOutputWaitTimeChanged :: Server s a b -> Signal Double
serverTotalOutputWaitTimeChanged server =
mapSignalM (const $ serverTotalOutputWaitTime server) (serverTotalOutputWaitTimeChanged_ server)
serverTotalOutputWaitTimeChanged_ :: Server s a b -> Signal ()
serverTotalOutputWaitTimeChanged_ server =
mapSignal (const ()) (serverOutputProvided server)
serverTotalPreemptionTime :: Server s a b -> Event Double
serverTotalPreemptionTime server =
Event $ \p -> readIORef (serverTotalPreemptionTimeRef server)
serverTotalPreemptionTimeChanged :: Server s a b -> Signal Double
serverTotalPreemptionTimeChanged server =
mapSignalM (const $ serverTotalPreemptionTime server) (serverTotalPreemptionTimeChanged_ server)
serverTotalPreemptionTimeChanged_ :: Server s a b -> Signal ()
serverTotalPreemptionTimeChanged_ server =
mapSignal (const ()) (serverTaskPreemptionEnding server)
serverInputWaitTime :: Server s a b -> Event (SamplingStats Double)
serverInputWaitTime server =
Event $ \p -> readIORef (serverInputWaitTimeRef server)
serverInputWaitTimeChanged :: Server s a b -> Signal (SamplingStats Double)
serverInputWaitTimeChanged server =
mapSignalM (const $ serverInputWaitTime server) (serverInputWaitTimeChanged_ server)
serverInputWaitTimeChanged_ :: Server s a b -> Signal ()
serverInputWaitTimeChanged_ server =
mapSignal (const ()) (serverInputReceived server)
serverProcessingTime :: Server s a b -> Event (SamplingStats Double)
serverProcessingTime server =
Event $ \p -> readIORef (serverProcessingTimeRef server)
serverProcessingTimeChanged :: Server s a b -> Signal (SamplingStats Double)
serverProcessingTimeChanged server =
mapSignalM (const $ serverProcessingTime server) (serverProcessingTimeChanged_ server)
serverProcessingTimeChanged_ :: Server s a b -> Signal ()
serverProcessingTimeChanged_ server =
mapSignal (const ()) (serverTaskProcessed server)
serverOutputWaitTime :: Server s a b -> Event (SamplingStats Double)
serverOutputWaitTime server =
Event $ \p -> readIORef (serverOutputWaitTimeRef server)
serverOutputWaitTimeChanged :: Server s a b -> Signal (SamplingStats Double)
serverOutputWaitTimeChanged server =
mapSignalM (const $ serverOutputWaitTime server) (serverOutputWaitTimeChanged_ server)
serverOutputWaitTimeChanged_ :: Server s a b -> Signal ()
serverOutputWaitTimeChanged_ server =
mapSignal (const ()) (serverOutputProvided server)
serverPreemptionTime :: Server s a b -> Event (SamplingStats Double)
serverPreemptionTime server =
Event $ \p -> readIORef (serverPreemptionTimeRef server)
serverPreemptionTimeChanged :: Server s a b -> Signal (SamplingStats Double)
serverPreemptionTimeChanged server =
mapSignalM (const $ serverPreemptionTime server) (serverPreemptionTimeChanged_ server)
serverPreemptionTimeChanged_ :: Server s a b -> Signal ()
serverPreemptionTimeChanged_ server =
mapSignal (const ()) (serverTaskPreemptionEnding server)
serverInputWaitFactor :: Server s a b -> Event Double
serverInputWaitFactor server =
Event $ \p ->
do x1 <- readIORef (serverTotalInputWaitTimeRef server)
x2 <- readIORef (serverTotalProcessingTimeRef server)
x3 <- readIORef (serverTotalOutputWaitTimeRef server)
x4 <- readIORef (serverTotalPreemptionTimeRef server)
return (x1 / (x1 + x2 + x3 + x4))
serverInputWaitFactorChanged :: Server s a b -> Signal Double
serverInputWaitFactorChanged server =
mapSignalM (const $ serverInputWaitFactor server) (serverInputWaitFactorChanged_ server)
serverInputWaitFactorChanged_ :: Server s a b -> Signal ()
serverInputWaitFactorChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server) <>
mapSignal (const ()) (serverTaskPreemptionEnding server)
serverProcessingFactor :: Server s a b -> Event Double
serverProcessingFactor server =
Event $ \p ->
do x1 <- readIORef (serverTotalInputWaitTimeRef server)
x2 <- readIORef (serverTotalProcessingTimeRef server)
x3 <- readIORef (serverTotalOutputWaitTimeRef server)
x4 <- readIORef (serverTotalPreemptionTimeRef server)
return (x2 / (x1 + x2 + x3 + x4))
serverProcessingFactorChanged :: Server s a b -> Signal Double
serverProcessingFactorChanged server =
mapSignalM (const $ serverProcessingFactor server) (serverProcessingFactorChanged_ server)
serverProcessingFactorChanged_ :: Server s a b -> Signal ()
serverProcessingFactorChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server) <>
mapSignal (const ()) (serverTaskPreemptionEnding server)
serverOutputWaitFactor :: Server s a b -> Event Double
serverOutputWaitFactor server =
Event $ \p ->
do x1 <- readIORef (serverTotalInputWaitTimeRef server)
x2 <- readIORef (serverTotalProcessingTimeRef server)
x3 <- readIORef (serverTotalOutputWaitTimeRef server)
x4 <- readIORef (serverTotalPreemptionTimeRef server)
return (x3 / (x1 + x2 + x3 + x4))
serverOutputWaitFactorChanged :: Server s a b -> Signal Double
serverOutputWaitFactorChanged server =
mapSignalM (const $ serverOutputWaitFactor server) (serverOutputWaitFactorChanged_ server)
serverOutputWaitFactorChanged_ :: Server s a b -> Signal ()
serverOutputWaitFactorChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server) <>
mapSignal (const ()) (serverTaskPreemptionEnding server)
serverPreemptionFactor :: Server s a b -> Event Double
serverPreemptionFactor server =
Event $ \p ->
do x1 <- readIORef (serverTotalInputWaitTimeRef server)
x2 <- readIORef (serverTotalProcessingTimeRef server)
x3 <- readIORef (serverTotalOutputWaitTimeRef server)
x4 <- readIORef (serverTotalPreemptionTimeRef server)
return (x4 / (x1 + x2 + x3 + x4))
serverPreemptionFactorChanged :: Server s a b -> Signal Double
serverPreemptionFactorChanged server =
mapSignalM (const $ serverPreemptionFactor server) (serverPreemptionFactorChanged_ server)
serverPreemptionFactorChanged_ :: Server s a b -> Signal ()
serverPreemptionFactorChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server) <>
mapSignal (const ()) (serverTaskPreemptionEnding server)
serverInputReceived :: Server s a b -> Signal a
serverInputReceived = publishSignal . serverInputReceivedSource
serverTaskPreemptionBeginning :: Server s a b -> Signal a
serverTaskPreemptionBeginning = publishSignal . serverTaskPreemptionBeginningSource
serverTaskPreemptionEnding :: Server s a b -> Signal a
serverTaskPreemptionEnding = publishSignal . serverTaskPreemptionEndingSource
serverTaskProcessed :: Server s a b -> Signal (a, b)
serverTaskProcessed = publishSignal . serverTaskProcessedSource
serverOutputProvided :: Server s a b -> Signal (a, b)
serverOutputProvided = publishSignal . serverOutputProvidedSource
serverChanged_ :: Server s a b -> Signal ()
serverChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server) <>
mapSignal (const ()) (serverTaskPreemptionEnding server)
serverSummary :: Server s a b -> Int -> Event ShowS
serverSummary server indent =
Event $ \p ->
do tx1 <- readIORef (serverTotalInputWaitTimeRef server)
tx2 <- readIORef (serverTotalProcessingTimeRef server)
tx3 <- readIORef (serverTotalOutputWaitTimeRef server)
tx4 <- readIORef (serverTotalPreemptionTimeRef server)
let xf1 = tx1 / (tx1 + tx2 + tx3 + tx4)
xf2 = tx2 / (tx1 + tx2 + tx3 + tx4)
xf3 = tx3 / (tx1 + tx2 + tx3 + tx4)
xf4 = tx4 / (tx1 + tx3 + tx3 + tx4)
xs1 <- readIORef (serverInputWaitTimeRef server)
xs2 <- readIORef (serverProcessingTimeRef server)
xs3 <- readIORef (serverOutputWaitTimeRef server)
xs4 <- readIORef (serverPreemptionTimeRef server)
let tab = replicate indent ' '
return $
showString tab .
showString "total input wait time (locked while awaiting the input) = " . shows tx1 .
showString "\n" .
showString tab .
showString "total processing time = " . shows tx2 .
showString "\n" .
showString tab .
showString "total output wait time (locked while delivering the output) = " . shows tx3 .
showString "\n\n" .
showString tab .
showString "total preemption time = " . shows tx4 .
showString "\n" .
showString tab .
showString "input wait factor (from 0 to 1) = " . shows xf1 .
showString "\n" .
showString tab .
showString "processing factor (from 0 to 1) = " . shows xf2 .
showString "\n" .
showString tab .
showString "output wait factor (from 0 to 1) = " . shows xf3 .
showString "\n\n" .
showString tab .
showString "output preemption factor (from 0 to 1) = " . shows xf4 .
showString "\n\n" .
showString tab .
showString "input wait time (locked while awaiting the input):\n\n" .
samplingStatsSummary xs1 (2 + indent) .
showString "\n\n" .
showString tab .
showString "processing time:\n\n" .
samplingStatsSummary xs2 (2 + indent) .
showString "\n\n" .
showString tab .
showString "output wait time (locked while delivering the output):\n\n" .
samplingStatsSummary xs3 (2 + indent) .
showString "\n\n" .
showString tab .
showString "preemption time (waiting for the proceeding after preemption):\n\n" .
samplingStatsSummary xs4 (2 + indent)
resetServer :: Server s a b -> Event ()
resetServer server =
Event $ \p ->
do writeIORef (serverTotalInputWaitTimeRef server) 0
writeIORef (serverTotalProcessingTimeRef server) 0
writeIORef (serverTotalOutputWaitTimeRef server) 0
writeIORef (serverTotalPreemptionTimeRef server) 0
writeIORef (serverInputWaitTimeRef server) mempty
writeIORef (serverProcessingTimeRef server) mempty
writeIORef (serverOutputWaitTimeRef server) mempty
writeIORef (serverPreemptionTimeRef server) mempty