module Simulation.Aivika.Resource.Preemption
(
Resource,
newResource,
newResourceWithMaxCount,
resourceMaxCount,
resourceCount,
resourceCountStats,
resourceUtilisationCount,
resourceUtilisationCountStats,
resourceQueueCount,
resourceQueueCountStats,
resourceTotalWaitTime,
resourceWaitTime,
requestResourceWithPriority,
releaseResource,
usingResourceWithPriority,
incResourceCount,
decResourceCount,
alterResourceCount,
resourceCountChanged,
resourceCountChanged_,
resourceUtilisationCountChanged,
resourceUtilisationCountChanged_,
resourceQueueCountChanged,
resourceQueueCountChanged_,
resourceWaitTimeChanged,
resourceWaitTimeChanged_,
resourceChanged_) where
import Data.IORef
import Data.Monoid
import Data.Maybe
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Cont
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.QueueStrategy
import Simulation.Aivika.Statistics
import Simulation.Aivika.Signal
import qualified Simulation.Aivika.PriorityQueue as PQ
data Resource =
Resource { resourceMaxCount :: Maybe Int,
resourceCountRef :: IORef Int,
resourceCountStatsRef :: IORef (TimingStats Int),
resourceCountSource :: SignalSource Int,
resourceUtilisationCountRef :: IORef Int,
resourceUtilisationCountStatsRef :: IORef (TimingStats Int),
resourceUtilisationCountSource :: SignalSource Int,
resourceQueueCountRef :: IORef Int,
resourceQueueCountStatsRef :: IORef (TimingStats Int),
resourceQueueCountSource :: SignalSource Int,
resourceTotalWaitTimeRef :: IORef Double,
resourceWaitTimeRef :: IORef (SamplingStats Double),
resourceWaitTimeSource :: SignalSource (),
resourceActingQueue :: PQ.PriorityQueue ResourceActingItem,
resourceWaitQueue :: PQ.PriorityQueue ResourceAwaitingItem }
data ResourceActingItem =
ResourceActingItem { actingItemPriority :: Double,
actingItemId :: ProcessId }
type ResourceAwaitingItem = Either ResourceRequestingItem ResourcePreemptedItem
data ResourceRequestingItem =
ResourceRequestingItem { requestingItemPriority :: Double,
requestingItemTime :: Double,
requestingItemId :: ProcessId,
requestingItemCont :: FrozenCont () }
data ResourcePreemptedItem =
ResourcePreemptedItem { preemptedItemPriority :: Double,
preemptedItemTime :: Double,
preemptedItemId :: ProcessId }
instance Eq Resource where
x == y = resourceCountRef x == resourceCountRef y
instance Eq ResourceActingItem where
x == y = actingItemId x == actingItemId y
newResource :: Int
-> Event Resource
newResource count =
newResourceWithMaxCount count (Just count)
newResourceWithMaxCount :: Int
-> Maybe Int
-> Event Resource
newResourceWithMaxCount count maxCount =
Event $ \p ->
do let r = pointRun p
t = pointTime p
when (count < 0) $
error $
"The resource count cannot be negative: " ++
"newResourceWithMaxCount."
case maxCount of
Just maxCount | count > maxCount ->
error $
"The resource count cannot be greater than " ++
"its maximum value: newResourceWithMaxCount."
_ ->
return ()
countRef <- newIORef count
countStatsRef <- newIORef $ returnTimingStats t count
countSource <- invokeSimulation r newSignalSource
utilCountRef <- newIORef 0
utilCountStatsRef <- newIORef $ returnTimingStats t 0
utilCountSource <- invokeSimulation r newSignalSource
queueCountRef <- newIORef 0
queueCountStatsRef <- newIORef $ returnTimingStats t 0
queueCountSource <- invokeSimulation r newSignalSource
totalWaitTimeRef <- newIORef 0
waitTimeRef <- newIORef emptySamplingStats
waitTimeSource <- invokeSimulation r newSignalSource
actingQueue <- PQ.newQueue
waitQueue <- PQ.newQueue
return Resource { resourceMaxCount = maxCount,
resourceCountRef = countRef,
resourceCountStatsRef = countStatsRef,
resourceCountSource = countSource,
resourceUtilisationCountRef = utilCountRef,
resourceUtilisationCountStatsRef = utilCountStatsRef,
resourceUtilisationCountSource = utilCountSource,
resourceQueueCountRef = queueCountRef,
resourceQueueCountStatsRef = queueCountStatsRef,
resourceQueueCountSource = queueCountSource,
resourceTotalWaitTimeRef = totalWaitTimeRef,
resourceWaitTimeRef = waitTimeRef,
resourceWaitTimeSource = waitTimeSource,
resourceActingQueue = actingQueue,
resourceWaitQueue = waitQueue }
resourceCount :: Resource -> Event Int
resourceCount r =
Event $ \p -> readIORef (resourceCountRef r)
resourceCountStats :: Resource -> Event (TimingStats Int)
resourceCountStats r =
Event $ \p -> readIORef (resourceCountStatsRef r)
resourceCountChanged :: Resource -> Signal Int
resourceCountChanged r =
publishSignal $ resourceCountSource r
resourceCountChanged_ :: Resource -> Signal ()
resourceCountChanged_ r =
mapSignal (const ()) $ resourceCountChanged r
resourceUtilisationCount :: Resource -> Event Int
resourceUtilisationCount r =
Event $ \p -> readIORef (resourceUtilisationCountRef r)
resourceUtilisationCountStats :: Resource -> Event (TimingStats Int)
resourceUtilisationCountStats r =
Event $ \p -> readIORef (resourceUtilisationCountStatsRef r)
resourceUtilisationCountChanged :: Resource -> Signal Int
resourceUtilisationCountChanged r =
publishSignal $ resourceUtilisationCountSource r
resourceUtilisationCountChanged_ :: Resource -> Signal ()
resourceUtilisationCountChanged_ r =
mapSignal (const ()) $ resourceUtilisationCountChanged r
resourceQueueCount :: Resource -> Event Int
resourceQueueCount r =
Event $ \p -> readIORef (resourceQueueCountRef r)
resourceQueueCountStats :: Resource -> Event (TimingStats Int)
resourceQueueCountStats r =
Event $ \p -> readIORef (resourceQueueCountStatsRef r)
resourceQueueCountChanged :: Resource -> Signal Int
resourceQueueCountChanged r =
publishSignal $ resourceQueueCountSource r
resourceQueueCountChanged_ :: Resource -> Signal ()
resourceQueueCountChanged_ r =
mapSignal (const ()) $ resourceQueueCountChanged r
resourceTotalWaitTime :: Resource -> Event Double
resourceTotalWaitTime r =
Event $ \p -> readIORef (resourceTotalWaitTimeRef r)
resourceWaitTime :: Resource -> Event (SamplingStats Double)
resourceWaitTime r =
Event $ \p -> readIORef (resourceWaitTimeRef r)
resourceWaitTimeChanged :: Resource -> Signal (SamplingStats Double)
resourceWaitTimeChanged r =
mapSignalM (\() -> resourceWaitTime r) $ resourceWaitTimeChanged_ r
resourceWaitTimeChanged_ :: Resource -> Signal ()
resourceWaitTimeChanged_ r =
publishSignal $ resourceWaitTimeSource r
requestResourceWithPriority :: Resource
-> Double
-> Process ()
requestResourceWithPriority r priority =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let t = pointTime p
a <- readIORef (resourceCountRef r)
if a == 0
then do f <- PQ.queueNull (resourceActingQueue r)
if f
then do c <- invokeEvent p $
freezeContReentering c () $
invokeCont c $
invokeProcess pid $
requestResourceWithPriority r priority
PQ.enqueue (resourceWaitQueue r) priority (Left $ ResourceRequestingItem priority t pid c)
invokeEvent p $ updateResourceQueueCount r 1
else do (p0', item0) <- PQ.queueFront (resourceActingQueue r)
let p0 = p0'
pid0 = actingItemId item0
if priority < p0
then do PQ.dequeue (resourceActingQueue r)
PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
PQ.enqueue (resourceWaitQueue r) p0 (Right $ ResourcePreemptedItem p0 t pid0)
invokeEvent p $ updateResourceQueueCount r 1
invokeEvent p $ processPreemptionBegin pid0
invokeEvent p $ resumeCont c ()
else do c <- invokeEvent p $
freezeContReentering c () $
invokeCont c $
invokeProcess pid $
requestResourceWithPriority r priority
PQ.enqueue (resourceWaitQueue r) priority (Left $ ResourceRequestingItem priority t pid c)
invokeEvent p $ updateResourceQueueCount r 1
else do PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
invokeEvent p $ updateResourceWaitTime r 0
invokeEvent p $ updateResourceCount r (1)
invokeEvent p $ updateResourceUtilisationCount r 1
invokeEvent p $ resumeCont c ()
releaseResource :: Resource
-> Process ()
releaseResource r =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do f <- fmap isJust $ PQ.queueDeleteBy (resourceActingQueue r) (\item -> actingItemId item == pid)
if f
then do invokeEvent p $ updateResourceUtilisationCount r (1)
invokeEvent p $ releaseResource' r
invokeEvent p $ resumeCont c ()
else error $
"The resource was not acquired by this process: releaseResource"
releaseResource' :: Resource
-> Event ()
releaseResource' r =
Event $ \p ->
do a <- readIORef (resourceCountRef r)
let a' = a + 1
case resourceMaxCount r of
Just maxCount | a' > maxCount ->
error $
"The resource count cannot be greater than " ++
"its maximum value: releaseResource'."
_ ->
return ()
f <- PQ.queueNull (resourceWaitQueue r)
if f
then invokeEvent p $ updateResourceCount r 1
else do (priority', item) <- PQ.queueFront (resourceWaitQueue r)
PQ.dequeue (resourceWaitQueue r)
invokeEvent p $ updateResourceQueueCount r (1)
case item of
Left (ResourceRequestingItem priority t pid c) ->
do c <- invokeEvent p $ unfreezeCont c
case c of
Nothing ->
invokeEvent p $ releaseResource' r
Just c ->
do PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
invokeEvent p $ updateResourceWaitTime r (pointTime p t)
invokeEvent p $ updateResourceUtilisationCount r 1
invokeEvent p $ enqueueEvent (pointTime p) $ reenterCont c ()
Right (ResourcePreemptedItem priority t pid) ->
do f <- invokeEvent p $ processCancelled pid
case f of
True ->
invokeEvent p $ releaseResource' r
False ->
do PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
invokeEvent p $ updateResourceWaitTime r (pointTime p t)
invokeEvent p $ updateResourceUtilisationCount r 1
invokeEvent p $ processPreemptionEnd pid
usingResourceWithPriority :: Resource
-> Double
-> Process a
-> Process a
usingResourceWithPriority r priority m =
do requestResourceWithPriority r priority
finallyProcess m $ releaseResource r
decResourceCount' :: Resource -> Event ()
decResourceCount' r =
Event $ \p ->
do let t = pointTime p
a <- readIORef (resourceCountRef r)
when (a == 0) $
error $
"The resource exceeded and its count is zero: decResourceCount'"
f <- PQ.queueNull (resourceActingQueue r)
unless f $
do (p0', item0) <- PQ.queueFront (resourceActingQueue r)
let p0 = p0'
pid0 = actingItemId item0
PQ.dequeue (resourceActingQueue r)
PQ.enqueue (resourceWaitQueue r) p0 (Right $ ResourcePreemptedItem p0 t pid0)
invokeEvent p $ processPreemptionBegin pid0
invokeEvent p $ updateResourceUtilisationCount r (1)
invokeEvent p $ updateResourceQueueCount r 1
invokeEvent p $ updateResourceCount r (1)
incResourceCount :: Resource
-> Int
-> Event ()
incResourceCount r n
| n < 0 = error "The increment cannot be negative: incResourceCount"
| n == 0 = return ()
| otherwise =
do releaseResource' r
incResourceCount r (n 1)
decResourceCount :: Resource
-> Int
-> Event ()
decResourceCount r n
| n < 0 = error "The decrement cannot be negative: decResourceCount"
| n == 0 = return ()
| otherwise =
do decResourceCount' r
decResourceCount r (n 1)
alterResourceCount :: Resource
-> Int
-> Event ()
alterResourceCount r n
| n < 0 = decResourceCount r ( n)
| n > 0 = incResourceCount r n
| n == 0 = return ()
resourceChanged_ :: Resource -> Signal ()
resourceChanged_ r =
resourceCountChanged_ r <>
resourceUtilisationCountChanged_ r <>
resourceQueueCountChanged_ r
updateResourceCount :: Resource -> Int -> Event ()
updateResourceCount r delta =
Event $ \p ->
do a <- readIORef (resourceCountRef r)
let a' = a + delta
a' `seq` writeIORef (resourceCountRef r) a'
modifyIORef' (resourceCountStatsRef r) $
addTimingStats (pointTime p) a'
invokeEvent p $
triggerSignal (resourceCountSource r) a'
updateResourceQueueCount :: Resource -> Int -> Event ()
updateResourceQueueCount r delta =
Event $ \p ->
do a <- readIORef (resourceQueueCountRef r)
let a' = a + delta
a' `seq` writeIORef (resourceQueueCountRef r) a'
modifyIORef' (resourceQueueCountStatsRef r) $
addTimingStats (pointTime p) a'
invokeEvent p $
triggerSignal (resourceQueueCountSource r) a'
updateResourceUtilisationCount :: Resource -> Int -> Event ()
updateResourceUtilisationCount r delta =
Event $ \p ->
do a <- readIORef (resourceUtilisationCountRef r)
let a' = a + delta
a' `seq` writeIORef (resourceUtilisationCountRef r) a'
modifyIORef' (resourceUtilisationCountStatsRef r) $
addTimingStats (pointTime p) a'
invokeEvent p $
triggerSignal (resourceUtilisationCountSource r) a'
updateResourceWaitTime :: Resource -> Double -> Event ()
updateResourceWaitTime r delta =
Event $ \p ->
do a <- readIORef (resourceTotalWaitTimeRef r)
let a' = a + delta
a' `seq` writeIORef (resourceTotalWaitTimeRef r) a'
modifyIORef' (resourceWaitTimeRef r) $
addSamplingStats delta
invokeEvent p $
triggerSignal (resourceWaitTimeSource r) ()