module Simulation.Aivika.Trans.Queue
(
FCFSQueue,
LCFSQueue,
SIROQueue,
PriorityQueue,
Queue,
newFCFSQueue,
newLCFSQueue,
newSIROQueue,
newPriorityQueue,
newQueue,
enqueueStrategy,
enqueueStoringStrategy,
dequeueStrategy,
queueNull,
queueFull,
queueMaxCount,
queueCount,
queueCountStats,
enqueueCount,
enqueueLostCount,
enqueueStoreCount,
dequeueCount,
dequeueExtractCount,
queueLoadFactor,
enqueueRate,
enqueueStoreRate,
dequeueRate,
dequeueExtractRate,
queueWaitTime,
queueTotalWaitTime,
enqueueWaitTime,
dequeueWaitTime,
queueRate,
dequeue,
dequeueWithOutputPriority,
tryDequeue,
enqueue,
enqueueWithInputPriority,
enqueueWithStoringPriority,
enqueueWithInputStoringPriorities,
tryEnqueue,
tryEnqueueWithStoringPriority,
enqueueOrLost,
enqueueOrLost_,
enqueueWithStoringPriorityOrLost,
enqueueWithStoringPriorityOrLost_,
waitWhileFullQueue,
queueSummary,
queueNullChanged,
queueNullChanged_,
queueFullChanged,
queueFullChanged_,
queueCountChanged,
queueCountChanged_,
enqueueCountChanged,
enqueueCountChanged_,
enqueueLostCountChanged,
enqueueLostCountChanged_,
enqueueStoreCountChanged,
enqueueStoreCountChanged_,
dequeueCountChanged,
dequeueCountChanged_,
dequeueExtractCountChanged,
dequeueExtractCountChanged_,
queueLoadFactorChanged,
queueLoadFactorChanged_,
queueWaitTimeChanged,
queueWaitTimeChanged_,
queueTotalWaitTimeChanged,
queueTotalWaitTimeChanged_,
enqueueWaitTimeChanged,
enqueueWaitTimeChanged_,
dequeueWaitTimeChanged,
dequeueWaitTimeChanged_,
queueRateChanged,
queueRateChanged_,
enqueueInitiated,
enqueueStored,
enqueueLost,
dequeueRequested,
dequeueExtracted,
queueChanged_) where
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Trans.Session
import Simulation.Aivika.Trans.ProtoRef
import Simulation.Aivika.Trans.Comp
import Simulation.Aivika.Trans.Internal.Specs
import Simulation.Aivika.Trans.Internal.Parameter
import Simulation.Aivika.Trans.Internal.Simulation
import Simulation.Aivika.Trans.Internal.Dynamics
import Simulation.Aivika.Trans.Internal.Event
import Simulation.Aivika.Trans.Internal.Process
import Simulation.Aivika.Trans.Internal.Signal
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Resource
import Simulation.Aivika.Trans.QueueStrategy
import Simulation.Aivika.Trans.Statistics
type FCFSQueue m a = Queue m FCFS FCFS FCFS a
type LCFSQueue m a = Queue m FCFS LCFS FCFS a
type SIROQueue m a = Queue m FCFS SIRO FCFS a
type PriorityQueue m a = Queue m FCFS StaticPriorities FCFS a
data Queue m si sm so a =
Queue { queueMaxCount :: Int,
enqueueStrategy :: si,
enqueueStoringStrategy :: sm,
dequeueStrategy :: so,
enqueueRes :: Resource m si,
queueStore :: StrategyQueue m sm (QueueItem a),
dequeueRes :: Resource m so,
queueCountRef :: ProtoRef m Int,
queueCountStatsRef :: ProtoRef m (TimingStats Int),
enqueueCountRef :: ProtoRef m Int,
enqueueLostCountRef :: ProtoRef m Int,
enqueueStoreCountRef :: ProtoRef m Int,
dequeueCountRef :: ProtoRef m Int,
dequeueExtractCountRef :: ProtoRef m Int,
queueWaitTimeRef :: ProtoRef m (SamplingStats Double),
queueTotalWaitTimeRef :: ProtoRef m (SamplingStats Double),
enqueueWaitTimeRef :: ProtoRef m (SamplingStats Double),
dequeueWaitTimeRef :: ProtoRef m (SamplingStats Double),
enqueueInitiatedSource :: SignalSource m a,
enqueueLostSource :: SignalSource m a,
enqueueStoredSource :: SignalSource m a,
dequeueRequestedSource :: SignalSource m (),
dequeueExtractedSource :: SignalSource m a }
data QueueItem a =
QueueItem { itemValue :: a,
itemInputTime :: Double,
itemStoringTime :: Double
}
newFCFSQueue :: MonadComp m => Int -> Event m (FCFSQueue m a)
newFCFSQueue = newQueue FCFS FCFS FCFS
newLCFSQueue :: MonadComp m => Int -> Event m (LCFSQueue m a)
newLCFSQueue = newQueue FCFS LCFS FCFS
newSIROQueue :: MonadComp m => Int -> Event m (SIROQueue m a)
newSIROQueue = newQueue FCFS SIRO FCFS
newPriorityQueue :: MonadComp m => Int -> Event m (PriorityQueue m a)
newPriorityQueue = newQueue FCFS StaticPriorities FCFS
newQueue :: (MonadComp m,
QueueStrategy m si,
QueueStrategy m sm,
QueueStrategy m so) =>
si
-> sm
-> so
-> Int
-> Event m (Queue m si sm so a)
newQueue si sm so count =
do t <- liftDynamics time
sn <- liftParameter simulationSession
i <- liftComp $ newProtoRef sn 0
is <- liftComp $ newProtoRef sn $ returnTimingStats t 0
ci <- liftComp $ newProtoRef sn 0
cl <- liftComp $ newProtoRef sn 0
cm <- liftComp $ newProtoRef sn 0
cr <- liftComp $ newProtoRef sn 0
co <- liftComp $ newProtoRef sn 0
ri <- liftSimulation $ newResourceWithMaxCount si count (Just count)
qm <- liftSimulation $ newStrategyQueue sm
ro <- liftSimulation $ newResourceWithMaxCount so 0 (Just count)
w <- liftComp $ newProtoRef sn mempty
wt <- liftComp $ newProtoRef sn mempty
wi <- liftComp $ newProtoRef sn mempty
wo <- liftComp $ newProtoRef sn mempty
s1 <- liftSimulation $ newSignalSource
s2 <- liftSimulation $ newSignalSource
s3 <- liftSimulation $ newSignalSource
s4 <- liftSimulation $ newSignalSource
s5 <- liftSimulation $ newSignalSource
return Queue { queueMaxCount = count,
enqueueStrategy = si,
enqueueStoringStrategy = sm,
dequeueStrategy = so,
enqueueRes = ri,
queueStore = qm,
dequeueRes = ro,
queueCountRef = i,
queueCountStatsRef = is,
enqueueCountRef = ci,
enqueueLostCountRef = cl,
enqueueStoreCountRef = cm,
dequeueCountRef = cr,
dequeueExtractCountRef = co,
queueWaitTimeRef = w,
queueTotalWaitTimeRef = wt,
enqueueWaitTimeRef = wi,
dequeueWaitTimeRef = wo,
enqueueInitiatedSource = s1,
enqueueLostSource = s2,
enqueueStoredSource = s3,
dequeueRequestedSource = s4,
dequeueExtractedSource = s5 }
queueNull :: MonadComp m => Queue m si sm so a -> Event m Bool
queueNull q =
Event $ \p ->
do n <- readProtoRef (queueCountRef q)
return (n == 0)
queueNullChanged :: MonadComp m => Queue m si sm so a -> Signal m Bool
queueNullChanged q =
mapSignalM (const $ queueNull q) (queueNullChanged_ q)
queueNullChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueNullChanged_ = queueCountChanged_
queueFull :: MonadComp m => Queue m si sm so a -> Event m Bool
queueFull q =
Event $ \p ->
do n <- readProtoRef (queueCountRef q)
return (n == queueMaxCount q)
queueFullChanged :: MonadComp m => Queue m si sm so a -> Signal m Bool
queueFullChanged q =
mapSignalM (const $ queueFull q) (queueFullChanged_ q)
queueFullChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueFullChanged_ = queueCountChanged_
queueCount :: MonadComp m => Queue m si sm so a -> Event m Int
queueCount q =
Event $ \p -> readProtoRef (queueCountRef q)
queueCountStats :: MonadComp m => Queue m si sm so a -> Event m (TimingStats Int)
queueCountStats q =
Event $ \p -> readProtoRef (queueCountStatsRef q)
queueCountChanged :: MonadComp m => Queue m si sm so a -> Signal m Int
queueCountChanged q =
mapSignalM (const $ queueCount q) (queueCountChanged_ q)
queueCountChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueCountChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueCount :: MonadComp m => Queue m si sm so a -> Event m Int
enqueueCount q =
Event $ \p -> readProtoRef (enqueueCountRef q)
enqueueCountChanged :: MonadComp m => Queue m si sm so a -> Signal m Int
enqueueCountChanged q =
mapSignalM (const $ enqueueCount q) (enqueueCountChanged_ q)
enqueueCountChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
enqueueCountChanged_ q =
mapSignal (const ()) (enqueueInitiated q)
enqueueLostCount :: MonadComp m => Queue m si sm so a -> Event m Int
enqueueLostCount q =
Event $ \p -> readProtoRef (enqueueLostCountRef q)
enqueueLostCountChanged :: MonadComp m => Queue m si sm so a -> Signal m Int
enqueueLostCountChanged q =
mapSignalM (const $ enqueueLostCount q) (enqueueLostCountChanged_ q)
enqueueLostCountChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
enqueueLostCountChanged_ q =
mapSignal (const ()) (enqueueLost q)
enqueueStoreCount :: MonadComp m => Queue m si sm so a -> Event m Int
enqueueStoreCount q =
Event $ \p -> readProtoRef (enqueueStoreCountRef q)
enqueueStoreCountChanged :: MonadComp m => Queue m si sm so a -> Signal m Int
enqueueStoreCountChanged q =
mapSignalM (const $ enqueueStoreCount q) (enqueueStoreCountChanged_ q)
enqueueStoreCountChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
enqueueStoreCountChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueCount :: MonadComp m => Queue m si sm so a -> Event m Int
dequeueCount q =
Event $ \p -> readProtoRef (dequeueCountRef q)
dequeueCountChanged :: MonadComp m => Queue m si sm so a -> Signal m Int
dequeueCountChanged q =
mapSignalM (const $ dequeueCount q) (dequeueCountChanged_ q)
dequeueCountChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
dequeueCountChanged_ q =
mapSignal (const ()) (dequeueRequested q)
dequeueExtractCount :: MonadComp m => Queue m si sm so a -> Event m Int
dequeueExtractCount q =
Event $ \p -> readProtoRef (dequeueExtractCountRef q)
dequeueExtractCountChanged :: MonadComp m => Queue m si sm so a -> Signal m Int
dequeueExtractCountChanged q =
mapSignalM (const $ dequeueExtractCount q) (dequeueExtractCountChanged_ q)
dequeueExtractCountChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
dequeueExtractCountChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueLoadFactor :: MonadComp m => Queue m si sm so a -> Event m Double
queueLoadFactor q =
Event $ \p ->
do x <- readProtoRef (queueCountRef q)
let y = queueMaxCount q
return (fromIntegral x / fromIntegral y)
queueLoadFactorChanged :: MonadComp m => Queue m si sm so a -> Signal m Double
queueLoadFactorChanged q =
mapSignalM (const $ queueLoadFactor q) (queueLoadFactorChanged_ q)
queueLoadFactorChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueLoadFactorChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueRate :: MonadComp m => Queue m si sm so a -> Event m Double
enqueueRate q =
Event $ \p ->
do x <- readProtoRef (enqueueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
enqueueStoreRate :: MonadComp m => Queue m si sm so a -> Event m Double
enqueueStoreRate q =
Event $ \p ->
do x <- readProtoRef (enqueueStoreCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
dequeueRate :: MonadComp m => Queue m si sm so a -> Event m Double
dequeueRate q =
Event $ \p ->
do x <- readProtoRef (dequeueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
dequeueExtractRate :: MonadComp m => Queue m si sm so a -> Event m Double
dequeueExtractRate q =
Event $ \p ->
do x <- readProtoRef (dequeueExtractCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
queueWaitTime :: MonadComp m => Queue m si sm so a -> Event m (SamplingStats Double)
queueWaitTime q =
Event $ \p -> readProtoRef (queueWaitTimeRef q)
queueWaitTimeChanged :: MonadComp m => Queue m si sm so a -> Signal m (SamplingStats Double)
queueWaitTimeChanged q =
mapSignalM (const $ queueWaitTime q) (queueWaitTimeChanged_ q)
queueWaitTimeChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueTotalWaitTime :: MonadComp m => Queue m si sm so a -> Event m (SamplingStats Double)
queueTotalWaitTime q =
Event $ \p -> readProtoRef (queueTotalWaitTimeRef q)
queueTotalWaitTimeChanged :: MonadComp m => Queue m si sm so a -> Signal m (SamplingStats Double)
queueTotalWaitTimeChanged q =
mapSignalM (const $ queueTotalWaitTime q) (queueTotalWaitTimeChanged_ q)
queueTotalWaitTimeChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueTotalWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
enqueueWaitTime :: MonadComp m => Queue m si sm so a -> Event m (SamplingStats Double)
enqueueWaitTime q =
Event $ \p -> readProtoRef (enqueueWaitTimeRef q)
enqueueWaitTimeChanged :: MonadComp m => Queue m si sm so a -> Signal m (SamplingStats Double)
enqueueWaitTimeChanged q =
mapSignalM (const $ enqueueWaitTime q) (enqueueWaitTimeChanged_ q)
enqueueWaitTimeChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
enqueueWaitTimeChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueWaitTime :: MonadComp m => Queue m si sm so a -> Event m (SamplingStats Double)
dequeueWaitTime q =
Event $ \p -> readProtoRef (dequeueWaitTimeRef q)
dequeueWaitTimeChanged :: MonadComp m => Queue m si sm so a -> Signal m (SamplingStats Double)
dequeueWaitTimeChanged q =
mapSignalM (const $ dequeueWaitTime q) (dequeueWaitTimeChanged_ q)
dequeueWaitTimeChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
dequeueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueRate :: MonadComp m => Queue m si sm so a -> Event m Double
queueRate q =
Event $ \p ->
do x <- readProtoRef (queueCountStatsRef q)
y <- readProtoRef (queueWaitTimeRef q)
return (timingStatsMean x / samplingStatsMean y)
queueRateChanged :: MonadComp m => Queue m si sm so a -> Signal m Double
queueRateChanged q =
mapSignalM (const $ queueRate q) (queueRateChanged_ q)
queueRateChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueRateChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
dequeue :: (MonadComp m,
DequeueStrategy m si,
DequeueStrategy m sm,
EnqueueStrategy m so)
=> Queue m si sm so a
-> Process m a
dequeue q =
do t <- liftEvent $ dequeueRequest q
requestResource (dequeueRes q)
liftEvent $ dequeueExtract q t
dequeueWithOutputPriority :: (MonadComp m,
DequeueStrategy m si,
DequeueStrategy m sm,
PriorityQueueStrategy m so po)
=> Queue m si sm so a
-> po
-> Process m a
dequeueWithOutputPriority q po =
do t <- liftEvent $ dequeueRequest q
requestResourceWithPriority (dequeueRes q) po
liftEvent $ dequeueExtract q t
tryDequeue :: (MonadComp m,
DequeueStrategy m si,
DequeueStrategy m sm)
=> Queue m si sm so a
-> Event m (Maybe a)
tryDequeue q =
do x <- tryRequestResourceWithinEvent (dequeueRes q)
if x
then do t <- dequeueRequest q
fmap Just $ dequeueExtract q t
else return Nothing
enqueue :: (MonadComp m,
EnqueueStrategy m si,
EnqueueStrategy m sm,
DequeueStrategy m so)
=> Queue m si sm so a
-> a
-> Process m ()
enqueue q a =
do i <- liftEvent $ enqueueInitiate q a
requestResource (enqueueRes q)
liftEvent $ enqueueStore q i
enqueueWithInputPriority :: (MonadComp m,
PriorityQueueStrategy m si pi,
EnqueueStrategy m sm,
DequeueStrategy m so)
=> Queue m si sm so a
-> pi
-> a
-> Process m ()
enqueueWithInputPriority q pi a =
do i <- liftEvent $ enqueueInitiate q a
requestResourceWithPriority (enqueueRes q) pi
liftEvent $ enqueueStore q i
enqueueWithStoringPriority :: (MonadComp m,
EnqueueStrategy m si,
PriorityQueueStrategy m sm pm,
DequeueStrategy m so)
=> Queue m si sm so a
-> pm
-> a
-> Process m ()
enqueueWithStoringPriority q pm a =
do i <- liftEvent $ enqueueInitiate q a
requestResource (enqueueRes q)
liftEvent $ enqueueStoreWithPriority q pm i
enqueueWithInputStoringPriorities :: (MonadComp m,
PriorityQueueStrategy m si pi,
PriorityQueueStrategy m sm pm,
DequeueStrategy m so)
=> Queue m si sm so a
-> pi
-> pm
-> a
-> Process m ()
enqueueWithInputStoringPriorities q pi pm a =
do i <- liftEvent $ enqueueInitiate q a
requestResourceWithPriority (enqueueRes q) pi
liftEvent $ enqueueStoreWithPriority q pm i
tryEnqueue :: (MonadComp m,
EnqueueStrategy m sm,
DequeueStrategy m so)
=> Queue m si sm so a
-> a
-> Event m Bool
tryEnqueue q a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStore q
return True
else return False
tryEnqueueWithStoringPriority :: (MonadComp m,
PriorityQueueStrategy m sm pm,
DequeueStrategy m so)
=> Queue m si sm so a
-> pm
-> a
-> Event m Bool
tryEnqueueWithStoringPriority q pm a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStoreWithPriority q pm
return True
else return False
enqueueOrLost :: (MonadComp m,
EnqueueStrategy m sm,
DequeueStrategy m so)
=> Queue m si sm so a
-> a
-> Event m Bool
enqueueOrLost q a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStore q
return True
else do enqueueDeny q a
return False
enqueueWithStoringPriorityOrLost :: (MonadComp m,
PriorityQueueStrategy m sm pm,
DequeueStrategy m so)
=> Queue m si sm so a
-> pm
-> a
-> Event m Bool
enqueueWithStoringPriorityOrLost q pm a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStoreWithPriority q pm
return True
else do enqueueDeny q a
return False
enqueueOrLost_ :: (MonadComp m,
EnqueueStrategy m sm,
DequeueStrategy m so)
=> Queue m si sm so a
-> a
-> Event m ()
enqueueOrLost_ q a =
do x <- enqueueOrLost q a
return ()
enqueueWithStoringPriorityOrLost_ :: (MonadComp m,
PriorityQueueStrategy m sm pm,
DequeueStrategy m so)
=> Queue m si sm so a
-> pm
-> a
-> Event m ()
enqueueWithStoringPriorityOrLost_ q pm a =
do x <- enqueueWithStoringPriorityOrLost q pm a
return ()
enqueueInitiated :: MonadComp m => Queue m si sm so a -> Signal m a
enqueueInitiated q = publishSignal (enqueueInitiatedSource q)
enqueueStored :: MonadComp m => Queue m si sm so a -> Signal m a
enqueueStored q = publishSignal (enqueueStoredSource q)
enqueueLost :: MonadComp m => Queue m si sm so a -> Signal m a
enqueueLost q = publishSignal (enqueueLostSource q)
dequeueRequested :: MonadComp m => Queue m si sm so a -> Signal m ()
dequeueRequested q = publishSignal (dequeueRequestedSource q)
dequeueExtracted :: MonadComp m => Queue m si sm so a -> Signal m a
dequeueExtracted q = publishSignal (dequeueExtractedSource q)
enqueueInitiate :: MonadComp m
=> Queue m si sm so a
-> a
-> Event m (QueueItem a)
enqueueInitiate q a =
Event $ \p ->
do let t = pointTime p
modifyProtoRef' (enqueueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (enqueueInitiatedSource q) a
return QueueItem { itemValue = a,
itemInputTime = t,
itemStoringTime = t
}
enqueueStore :: (MonadComp m,
EnqueueStrategy m sm,
DequeueStrategy m so)
=> Queue m si sm so a
-> QueueItem a
-> Event m ()
enqueueStore q i =
Event $ \p ->
do let i' = i { itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueue (queueStore q) i'
c <- readProtoRef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeProtoRef (queueCountRef q) c'
modifyProtoRef' (queueCountStatsRef q) (addTimingStats t c')
modifyProtoRef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
enqueueStat q i'
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i')
enqueueStoreWithPriority :: (MonadComp m,
PriorityQueueStrategy m sm pm,
DequeueStrategy m so)
=> Queue m si sm so a
-> pm
-> QueueItem a
-> Event m ()
enqueueStoreWithPriority q pm i =
Event $ \p ->
do let i' = i { itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueueWithPriority (queueStore q) pm i'
c <- readProtoRef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeProtoRef (queueCountRef q) c'
modifyProtoRef' (queueCountStatsRef q) (addTimingStats t c')
modifyProtoRef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
enqueueStat q i'
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i')
enqueueDeny :: MonadComp m
=> Queue m si sm so a
-> a
-> Event m ()
enqueueDeny q a =
Event $ \p ->
do modifyProtoRef' (enqueueLostCountRef q) $ (+) 1
invokeEvent p $
triggerSignal (enqueueLostSource q) a
enqueueStat :: MonadComp m
=> Queue m si sm so a
-> QueueItem a
-> Event m ()
enqueueStat q i =
Event $ \p ->
do let t0 = itemInputTime i
t1 = itemStoringTime i
modifyProtoRef' (enqueueWaitTimeRef q) $
addSamplingStats (t1 t0)
dequeueRequest :: MonadComp m
=> Queue m si sm so a
-> Event m Double
dequeueRequest q =
Event $ \p ->
do modifyProtoRef' (dequeueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (dequeueRequestedSource q) ()
return $ pointTime p
dequeueExtract :: (MonadComp m,
DequeueStrategy m si,
DequeueStrategy m sm)
=> Queue m si sm so a
-> Double
-> Event m a
dequeueExtract q t' =
Event $ \p ->
do i <- invokeEvent p $
strategyDequeue (queueStore q)
c <- readProtoRef (queueCountRef q)
let c' = c 1
t = pointTime p
c' `seq` writeProtoRef (queueCountRef q) c'
modifyProtoRef' (queueCountStatsRef q) (addTimingStats t c')
modifyProtoRef' (dequeueExtractCountRef q) (+ 1)
invokeEvent p $
dequeueStat q t' i
invokeEvent p $
releaseResourceWithinEvent (enqueueRes q)
invokeEvent p $
triggerSignal (dequeueExtractedSource q) (itemValue i)
return $ itemValue i
dequeueStat :: MonadComp m
=> Queue m si sm so a
-> Double
-> QueueItem a
-> Event m ()
dequeueStat q t' i =
Event $ \p ->
do let t0 = itemInputTime i
t1 = itemStoringTime i
t = pointTime p
modifyProtoRef' (dequeueWaitTimeRef q) $
addSamplingStats (t t')
modifyProtoRef' (queueTotalWaitTimeRef q) $
addSamplingStats (t t0)
modifyProtoRef' (queueWaitTimeRef q) $
addSamplingStats (t t1)
waitWhileFullQueue :: MonadComp m => Queue m si sm so a -> Process m ()
waitWhileFullQueue q =
do x <- liftEvent (queueFull q)
when x $
do processAwait (dequeueExtracted q)
waitWhileFullQueue q
queueChanged_ :: MonadComp m => Queue m si sm so a -> Signal m ()
queueChanged_ q =
mapSignal (const ()) (enqueueInitiated q) <>
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (enqueueLost q) <>
dequeueRequested q <>
mapSignal (const ()) (dequeueExtracted q)
queueSummary :: (MonadComp m, Show si, Show sm, Show so) => Queue m si sm so a -> Int -> Event m ShowS
queueSummary q indent =
do let si = enqueueStrategy q
sm = enqueueStoringStrategy q
so = dequeueStrategy q
null <- queueNull q
full <- queueFull q
let maxCount = queueMaxCount q
count <- queueCount q
countStats <- queueCountStats q
enqueueCount <- enqueueCount q
enqueueLostCount <- enqueueLostCount q
enqueueStoreCount <- enqueueStoreCount q
dequeueCount <- dequeueCount q
dequeueExtractCount <- dequeueExtractCount q
loadFactor <- queueLoadFactor q
enqueueRate <- enqueueRate q
enqueueStoreRate <- enqueueStoreRate q
dequeueRate <- dequeueRate q
dequeueExtractRate <- dequeueExtractRate q
waitTime <- queueWaitTime q
totalWaitTime <- queueTotalWaitTime q
enqueueWaitTime <- enqueueWaitTime q
dequeueWaitTime <- dequeueWaitTime q
let tab = replicate indent ' '
return $
showString tab .
showString "the enqueueing (input) strategy = " .
shows si .
showString "\n" .
showString tab .
showString "the storing (memory) strategy = " .
shows sm .
showString "\n" .
showString tab .
showString "the dequeueing (output) strategy = " .
shows so .
showString "\n" .
showString tab .
showString "empty? = " .
shows null .
showString "\n" .
showString tab .
showString "full? = " .
shows full .
showString "\n" .
showString tab .
showString "max. capacity = " .
shows maxCount .
showString "\n" .
showString tab .
showString "size = " .
shows count .
showString "\n" .
showString tab .
showString "the size statistics = \n\n" .
timingStatsSummary countStats (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue count (number of the input items that were enqueued) = " .
shows enqueueCount .
showString "\n" .
showString tab .
showString "the enqueue lost count (number of the lost items) = " .
shows enqueueLostCount .
showString "\n" .
showString tab .
showString "the enqueue store count (number of the input items that were stored) = " .
shows enqueueStoreCount .
showString "\n" .
showString tab .
showString "the dequeue count (number of requests for dequeueing an item) = " .
shows dequeueCount .
showString "\n" .
showString tab .
showString "the dequeue extract count (number of the output items that were dequeued) = " .
shows dequeueExtractCount .
showString "\n" .
showString tab .
showString "the load factor (size / max. capacity) = " .
shows loadFactor .
showString "\n" .
showString tab .
showString "the enqueue rate (how many input items were enqueued per time) = " .
shows enqueueRate .
showString "\n" .
showString tab .
showString "the enqueue store rate (how many input items were stored per time) = " .
shows enqueueStoreRate .
showString "\n" .
showString tab .
showString "the dequeue rate (how many requests for dequeueing per time) = " .
shows dequeueRate .
showString "\n" .
showString tab .
showString "the dequeue extract rate (how many output items were dequeued per time) = " .
shows dequeueExtractRate .
showString "\n" .
showString tab .
showString "the wait time (when was stored -> when was dequeued) = \n\n" .
samplingStatsSummary waitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the total wait time (when the enqueueing was initiated -> when was dequeued) = \n\n" .
samplingStatsSummary totalWaitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue wait time (when the enqueueing was initiated -> when was stored) = \n\n" .
samplingStatsSummary enqueueWaitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the dequeue wait time (when was requested for dequeueing -> when was dequeued) = \n\n" .
samplingStatsSummary dequeueWaitTime (2 + indent)