module Simulation.Aivika.Queue
(
FCFSQueue,
LCFSQueue,
SIROQueue,
PriorityQueue,
Queue,
newFCFSQueue,
newLCFSQueue,
newSIROQueue,
newPriorityQueue,
newQueue,
enqueueStrategy,
enqueueStoringStrategy,
dequeueStrategy,
queueNull,
queueFull,
queueMaxCount,
queueCount,
queueCountStats,
enqueueCount,
enqueueLostCount,
enqueueStoreCount,
dequeueCount,
dequeueExtractCount,
queueLoadFactor,
enqueueRate,
enqueueStoreRate,
dequeueRate,
dequeueExtractRate,
queueWaitTime,
queueTotalWaitTime,
enqueueWaitTime,
dequeueWaitTime,
queueRate,
dequeue,
dequeueWithOutputPriority,
tryDequeue,
enqueue,
enqueueWithInputPriority,
enqueueWithStoringPriority,
enqueueWithInputStoringPriorities,
tryEnqueue,
tryEnqueueWithStoringPriority,
enqueueOrLost,
enqueueOrLost_,
enqueueWithStoringPriorityOrLost,
enqueueWithStoringPriorityOrLost_,
queueDelete,
queueDelete_,
queueDeleteBy,
queueDeleteBy_,
queueContains,
queueContainsBy,
clearQueue,
resetQueue,
waitWhileFullQueue,
queueSummary,
queueNullChanged,
queueNullChanged_,
queueFullChanged,
queueFullChanged_,
queueCountChanged,
queueCountChanged_,
enqueueCountChanged,
enqueueCountChanged_,
enqueueLostCountChanged,
enqueueLostCountChanged_,
enqueueStoreCountChanged,
enqueueStoreCountChanged_,
dequeueCountChanged,
dequeueCountChanged_,
dequeueExtractCountChanged,
dequeueExtractCountChanged_,
queueLoadFactorChanged,
queueLoadFactorChanged_,
queueWaitTimeChanged,
queueWaitTimeChanged_,
queueTotalWaitTimeChanged,
queueTotalWaitTimeChanged_,
enqueueWaitTimeChanged,
enqueueWaitTimeChanged_,
dequeueWaitTimeChanged,
dequeueWaitTimeChanged_,
queueRateChanged,
queueRateChanged_,
enqueueInitiated,
enqueueStored,
enqueueLost,
dequeueRequested,
dequeueExtracted,
queueChanged_) where
import Data.IORef
import Data.Monoid
import Data.Maybe
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource.Base
import Simulation.Aivika.QueueStrategy
import Simulation.Aivika.Statistics
import qualified Simulation.Aivika.DoubleLinkedList as DLL
import qualified Simulation.Aivika.Vector as V
import qualified Simulation.Aivika.PriorityQueue as PQ
type FCFSQueue a = Queue FCFS FCFS FCFS a
type LCFSQueue a = Queue FCFS LCFS FCFS a
type SIROQueue a = Queue FCFS SIRO FCFS a
type PriorityQueue a = Queue FCFS StaticPriorities FCFS a
data Queue si sm so a =
Queue { queueMaxCount :: Int,
enqueueStrategy :: si,
enqueueStoringStrategy :: sm,
dequeueStrategy :: so,
enqueueRes :: Resource si,
queueStore :: StrategyQueue sm (QueueItem a),
dequeueRes :: Resource so,
queueCountRef :: IORef Int,
queueCountStatsRef :: IORef (TimingStats Int),
enqueueCountRef :: IORef Int,
enqueueLostCountRef :: IORef Int,
enqueueStoreCountRef :: IORef Int,
dequeueCountRef :: IORef Int,
dequeueExtractCountRef :: IORef Int,
queueWaitTimeRef :: IORef (SamplingStats Double),
queueTotalWaitTimeRef :: IORef (SamplingStats Double),
enqueueWaitTimeRef :: IORef (SamplingStats Double),
dequeueWaitTimeRef :: IORef (SamplingStats Double),
enqueueInitiatedSource :: SignalSource a,
enqueueLostSource :: SignalSource a,
enqueueStoredSource :: SignalSource a,
dequeueRequestedSource :: SignalSource (),
dequeueExtractedSource :: SignalSource a }
data QueueItem a =
QueueItem { itemValue :: a,
itemInputTime :: Double,
itemStoringTime :: Double
}
newFCFSQueue :: Int -> Event (FCFSQueue a)
newFCFSQueue = newQueue FCFS FCFS FCFS
newLCFSQueue :: Int -> Event (LCFSQueue a)
newLCFSQueue = newQueue FCFS LCFS FCFS
newSIROQueue :: Int -> Event (SIROQueue a)
newSIROQueue = newQueue FCFS SIRO FCFS
newPriorityQueue :: Int -> Event (PriorityQueue a)
newPriorityQueue = newQueue FCFS StaticPriorities FCFS
newQueue :: (QueueStrategy si,
QueueStrategy sm,
QueueStrategy so) =>
si
-> sm
-> so
-> Int
-> Event (Queue si sm so a)
newQueue si sm so count =
do t <- liftDynamics time
i <- liftIO $ newIORef 0
is <- liftIO $ newIORef $ returnTimingStats t 0
ci <- liftIO $ newIORef 0
cl <- liftIO $ newIORef 0
cm <- liftIO $ newIORef 0
cr <- liftIO $ newIORef 0
co <- liftIO $ newIORef 0
ri <- liftSimulation $ newResourceWithMaxCount si count (Just count)
qm <- liftSimulation $ newStrategyQueue sm
ro <- liftSimulation $ newResourceWithMaxCount so 0 (Just count)
w <- liftIO $ newIORef mempty
wt <- liftIO $ newIORef mempty
wi <- liftIO $ newIORef mempty
wo <- liftIO $ newIORef mempty
s1 <- liftSimulation $ newSignalSource
s2 <- liftSimulation $ newSignalSource
s3 <- liftSimulation $ newSignalSource
s4 <- liftSimulation $ newSignalSource
s5 <- liftSimulation $ newSignalSource
return Queue { queueMaxCount = count,
enqueueStrategy = si,
enqueueStoringStrategy = sm,
dequeueStrategy = so,
enqueueRes = ri,
queueStore = qm,
dequeueRes = ro,
queueCountRef = i,
queueCountStatsRef = is,
enqueueCountRef = ci,
enqueueLostCountRef = cl,
enqueueStoreCountRef = cm,
dequeueCountRef = cr,
dequeueExtractCountRef = co,
queueWaitTimeRef = w,
queueTotalWaitTimeRef = wt,
enqueueWaitTimeRef = wi,
dequeueWaitTimeRef = wo,
enqueueInitiatedSource = s1,
enqueueLostSource = s2,
enqueueStoredSource = s3,
dequeueRequestedSource = s4,
dequeueExtractedSource = s5 }
queueNull :: Queue si sm so a -> Event Bool
queueNull q =
Event $ \p ->
do n <- readIORef (queueCountRef q)
return (n == 0)
queueNullChanged :: Queue si sm so a -> Signal Bool
queueNullChanged q =
mapSignalM (const $ queueNull q) (queueNullChanged_ q)
queueNullChanged_ :: Queue si sm so a -> Signal ()
queueNullChanged_ = queueCountChanged_
queueFull :: Queue si sm so a -> Event Bool
queueFull q =
Event $ \p ->
do n <- readIORef (queueCountRef q)
return (n == queueMaxCount q)
queueFullChanged :: Queue si sm so a -> Signal Bool
queueFullChanged q =
mapSignalM (const $ queueFull q) (queueFullChanged_ q)
queueFullChanged_ :: Queue si sm so a -> Signal ()
queueFullChanged_ = queueCountChanged_
queueCount :: Queue si sm so a -> Event Int
queueCount q =
Event $ \p -> readIORef (queueCountRef q)
queueCountStats :: Queue si sm so a -> Event (TimingStats Int)
queueCountStats q =
Event $ \p -> readIORef (queueCountStatsRef q)
queueCountChanged :: Queue si sm so a -> Signal Int
queueCountChanged q =
mapSignalM (const $ queueCount q) (queueCountChanged_ q)
queueCountChanged_ :: Queue si sm so a -> Signal ()
queueCountChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueCount :: Queue si sm so a -> Event Int
enqueueCount q =
Event $ \p -> readIORef (enqueueCountRef q)
enqueueCountChanged :: Queue si sm so a -> Signal Int
enqueueCountChanged q =
mapSignalM (const $ enqueueCount q) (enqueueCountChanged_ q)
enqueueCountChanged_ :: Queue si sm so a -> Signal ()
enqueueCountChanged_ q =
mapSignal (const ()) (enqueueInitiated q)
enqueueLostCount :: Queue si sm so a -> Event Int
enqueueLostCount q =
Event $ \p -> readIORef (enqueueLostCountRef q)
enqueueLostCountChanged :: Queue si sm so a -> Signal Int
enqueueLostCountChanged q =
mapSignalM (const $ enqueueLostCount q) (enqueueLostCountChanged_ q)
enqueueLostCountChanged_ :: Queue si sm so a -> Signal ()
enqueueLostCountChanged_ q =
mapSignal (const ()) (enqueueLost q)
enqueueStoreCount :: Queue si sm so a -> Event Int
enqueueStoreCount q =
Event $ \p -> readIORef (enqueueStoreCountRef q)
enqueueStoreCountChanged :: Queue si sm so a -> Signal Int
enqueueStoreCountChanged q =
mapSignalM (const $ enqueueStoreCount q) (enqueueStoreCountChanged_ q)
enqueueStoreCountChanged_ :: Queue si sm so a -> Signal ()
enqueueStoreCountChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueCount :: Queue si sm so a -> Event Int
dequeueCount q =
Event $ \p -> readIORef (dequeueCountRef q)
dequeueCountChanged :: Queue si sm so a -> Signal Int
dequeueCountChanged q =
mapSignalM (const $ dequeueCount q) (dequeueCountChanged_ q)
dequeueCountChanged_ :: Queue si sm so a -> Signal ()
dequeueCountChanged_ q =
mapSignal (const ()) (dequeueRequested q)
dequeueExtractCount :: Queue si sm so a -> Event Int
dequeueExtractCount q =
Event $ \p -> readIORef (dequeueExtractCountRef q)
dequeueExtractCountChanged :: Queue si sm so a -> Signal Int
dequeueExtractCountChanged q =
mapSignalM (const $ dequeueExtractCount q) (dequeueExtractCountChanged_ q)
dequeueExtractCountChanged_ :: Queue si sm so a -> Signal ()
dequeueExtractCountChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueLoadFactor :: Queue si sm so a -> Event Double
queueLoadFactor q =
Event $ \p ->
do x <- readIORef (queueCountRef q)
let y = queueMaxCount q
return (fromIntegral x / fromIntegral y)
queueLoadFactorChanged :: Queue si sm so a -> Signal Double
queueLoadFactorChanged q =
mapSignalM (const $ queueLoadFactor q) (queueLoadFactorChanged_ q)
queueLoadFactorChanged_ :: Queue si sm so a -> Signal ()
queueLoadFactorChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueRate :: Queue si sm so a -> Event Double
enqueueRate q =
Event $ \p ->
do x <- readIORef (enqueueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t - t0))
enqueueStoreRate :: Queue si sm so a -> Event Double
enqueueStoreRate q =
Event $ \p ->
do x <- readIORef (enqueueStoreCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t - t0))
dequeueRate :: Queue si sm so a -> Event Double
dequeueRate q =
Event $ \p ->
do x <- readIORef (dequeueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t - t0))
dequeueExtractRate :: Queue si sm so a -> Event Double
dequeueExtractRate q =
Event $ \p ->
do x <- readIORef (dequeueExtractCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t - t0))
queueWaitTime :: Queue si sm so a -> Event (SamplingStats Double)
queueWaitTime q =
Event $ \p -> readIORef (queueWaitTimeRef q)
queueWaitTimeChanged :: Queue si sm so a -> Signal (SamplingStats Double)
queueWaitTimeChanged q =
mapSignalM (const $ queueWaitTime q) (queueWaitTimeChanged_ q)
queueWaitTimeChanged_ :: Queue si sm so a -> Signal ()
queueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueTotalWaitTime :: Queue si sm so a -> Event (SamplingStats Double)
queueTotalWaitTime q =
Event $ \p -> readIORef (queueTotalWaitTimeRef q)
queueTotalWaitTimeChanged :: Queue si sm so a -> Signal (SamplingStats Double)
queueTotalWaitTimeChanged q =
mapSignalM (const $ queueTotalWaitTime q) (queueTotalWaitTimeChanged_ q)
queueTotalWaitTimeChanged_ :: Queue si sm so a -> Signal ()
queueTotalWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
enqueueWaitTime :: Queue si sm so a -> Event (SamplingStats Double)
enqueueWaitTime q =
Event $ \p -> readIORef (enqueueWaitTimeRef q)
enqueueWaitTimeChanged :: Queue si sm so a -> Signal (SamplingStats Double)
enqueueWaitTimeChanged q =
mapSignalM (const $ enqueueWaitTime q) (enqueueWaitTimeChanged_ q)
enqueueWaitTimeChanged_ :: Queue si sm so a -> Signal ()
enqueueWaitTimeChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueWaitTime :: Queue si sm so a -> Event (SamplingStats Double)
dequeueWaitTime q =
Event $ \p -> readIORef (dequeueWaitTimeRef q)
dequeueWaitTimeChanged :: Queue si sm so a -> Signal (SamplingStats Double)
dequeueWaitTimeChanged q =
mapSignalM (const $ dequeueWaitTime q) (dequeueWaitTimeChanged_ q)
dequeueWaitTimeChanged_ :: Queue si sm so a -> Signal ()
dequeueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueRate :: Queue si sm so a -> Event Double
queueRate q =
Event $ \p ->
do x <- readIORef (queueCountStatsRef q)
y <- readIORef (queueWaitTimeRef q)
return (timingStatsMean x / samplingStatsMean y)
queueRateChanged :: Queue si sm so a -> Signal Double
queueRateChanged q =
mapSignalM (const $ queueRate q) (queueRateChanged_ q)
queueRateChanged_ :: Queue si sm so a -> Signal ()
queueRateChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
dequeue :: (DequeueStrategy si,
DequeueStrategy sm,
EnqueueStrategy so)
=> Queue si sm so a
-> Process a
dequeue q =
do t <- liftEvent $ dequeueRequest q
requestResource (dequeueRes q)
liftEvent $ dequeueExtract q t
dequeueWithOutputPriority :: (DequeueStrategy si,
DequeueStrategy sm,
PriorityQueueStrategy so po)
=> Queue si sm so a
-> po
-> Process a
dequeueWithOutputPriority q po =
do t <- liftEvent $ dequeueRequest q
requestResourceWithPriority (dequeueRes q) po
liftEvent $ dequeueExtract q t
tryDequeue :: (DequeueStrategy si,
DequeueStrategy sm)
=> Queue si sm so a
-> Event (Maybe a)
tryDequeue q =
do x <- tryRequestResourceWithinEvent (dequeueRes q)
if x
then do t <- dequeueRequest q
fmap Just $ dequeueExtract q t
else return Nothing
queueDelete :: (Eq a,
DequeueStrategy si,
DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> a
-> Event Bool
queueDelete q a = fmap isJust $ queueDeleteBy q (== a)
queueDelete_ :: (Eq a,
DequeueStrategy si,
DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> a
-> Event ()
queueDelete_ q a = fmap (const ()) $ queueDeleteBy q (== a)
queueDeleteBy :: (DequeueStrategy si,
DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> (a -> Bool)
-> Event (Maybe a)
queueDeleteBy q pred =
do x <- tryRequestResourceWithinEvent (dequeueRes q)
if x
then do i <- strategyQueueDeleteBy (queueStore q) (pred . itemValue)
case i of
Nothing ->
do releaseResourceWithinEvent (dequeueRes q)
return Nothing
Just i ->
do t <- dequeueRequest q
fmap Just $ dequeuePostExtract q t i
else return Nothing
queueDeleteBy_ :: (DequeueStrategy si,
DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> (a -> Bool)
-> Event ()
queueDeleteBy_ q pred = fmap (const ()) $ queueDeleteBy q pred
queueContains :: (Eq a,
DeletingQueueStrategy sm)
=> Queue si sm so a
-> a
-> Event Bool
queueContains q a = fmap isJust $ queueContainsBy q (== a)
queueContainsBy :: DeletingQueueStrategy sm
=> Queue si sm so a
-> (a -> Bool)
-> Event (Maybe a)
queueContainsBy q pred =
do x <- strategyQueueContainsBy (queueStore q) (pred . itemValue)
case x of
Nothing -> return Nothing
Just i -> return $ Just (itemValue i)
clearQueue :: (DequeueStrategy si,
DequeueStrategy sm)
=> Queue si sm so a
-> Event ()
clearQueue q =
do x <- tryDequeue q
case x of
Nothing -> return ()
Just a -> clearQueue q
enqueue :: (EnqueueStrategy si,
EnqueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> a
-> Process ()
enqueue q a =
do i <- liftEvent $ enqueueInitiate q a
requestResource (enqueueRes q)
liftEvent $ enqueueStore q i
enqueueWithInputPriority :: (PriorityQueueStrategy si pi,
EnqueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> pi
-> a
-> Process ()
enqueueWithInputPriority q pi a =
do i <- liftEvent $ enqueueInitiate q a
requestResourceWithPriority (enqueueRes q) pi
liftEvent $ enqueueStore q i
enqueueWithStoringPriority :: (EnqueueStrategy si,
PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue si sm so a
-> pm
-> a
-> Process ()
enqueueWithStoringPriority q pm a =
do i <- liftEvent $ enqueueInitiate q a
requestResource (enqueueRes q)
liftEvent $ enqueueStoreWithPriority q pm i
enqueueWithInputStoringPriorities :: (PriorityQueueStrategy si pi,
PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue si sm so a
-> pi
-> pm
-> a
-> Process ()
enqueueWithInputStoringPriorities q pi pm a =
do i <- liftEvent $ enqueueInitiate q a
requestResourceWithPriority (enqueueRes q) pi
liftEvent $ enqueueStoreWithPriority q pm i
tryEnqueue :: (EnqueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> a
-> Event Bool
tryEnqueue q a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStore q
return True
else return False
tryEnqueueWithStoringPriority :: (PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue si sm so a
-> pm
-> a
-> Event Bool
tryEnqueueWithStoringPriority q pm a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStoreWithPriority q pm
return True
else return False
enqueueOrLost :: (EnqueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> a
-> Event Bool
enqueueOrLost q a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStore q
return True
else do enqueueDeny q a
return False
enqueueWithStoringPriorityOrLost :: (PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue si sm so a
-> pm
-> a
-> Event Bool
enqueueWithStoringPriorityOrLost q pm a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStoreWithPriority q pm
return True
else do enqueueDeny q a
return False
enqueueOrLost_ :: (EnqueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> a
-> Event ()
enqueueOrLost_ q a =
do x <- enqueueOrLost q a
return ()
enqueueWithStoringPriorityOrLost_ :: (PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue si sm so a
-> pm
-> a
-> Event ()
enqueueWithStoringPriorityOrLost_ q pm a =
do x <- enqueueWithStoringPriorityOrLost q pm a
return ()
enqueueInitiated :: Queue si sm so a -> Signal a
enqueueInitiated q = publishSignal (enqueueInitiatedSource q)
enqueueStored :: Queue si sm so a -> Signal a
enqueueStored q = publishSignal (enqueueStoredSource q)
enqueueLost :: Queue si sm so a -> Signal a
enqueueLost q = publishSignal (enqueueLostSource q)
dequeueRequested :: Queue si sm so a -> Signal ()
dequeueRequested q = publishSignal (dequeueRequestedSource q)
dequeueExtracted :: Queue si sm so a -> Signal a
dequeueExtracted q = publishSignal (dequeueExtractedSource q)
enqueueInitiate :: Queue si sm so a
-> a
-> Event (QueueItem a)
enqueueInitiate q a =
Event $ \p ->
do let t = pointTime p
modifyIORef' (enqueueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (enqueueInitiatedSource q) a
return QueueItem { itemValue = a,
itemInputTime = t,
itemStoringTime = t
}
enqueueStore :: (EnqueueStrategy sm,
DequeueStrategy so)
=> Queue si sm so a
-> QueueItem a
-> Event ()
enqueueStore q i =
Event $ \p ->
do let i' = i { itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueue (queueStore q) i'
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
enqueueStat q i'
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i')
enqueueStoreWithPriority :: (PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue si sm so a
-> pm
-> QueueItem a
-> Event ()
enqueueStoreWithPriority q pm i =
Event $ \p ->
do let i' = i { itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueueWithPriority (queueStore q) pm i'
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
enqueueStat q i'
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i')
enqueueDeny :: Queue si sm so a
-> a
-> Event ()
enqueueDeny q a =
Event $ \p ->
do modifyIORef' (enqueueLostCountRef q) $ (+) 1
invokeEvent p $
triggerSignal (enqueueLostSource q) a
enqueueStat :: Queue si sm so a
-> QueueItem a
-> Event ()
enqueueStat q i =
Event $ \p ->
do let t0 = itemInputTime i
t1 = itemStoringTime i
modifyIORef' (enqueueWaitTimeRef q) $
addSamplingStats (t1 - t0)
dequeueRequest :: Queue si sm so a
-> Event Double
dequeueRequest q =
Event $ \p ->
do modifyIORef' (dequeueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (dequeueRequestedSource q) ()
return $ pointTime p
dequeueExtract :: (DequeueStrategy si,
DequeueStrategy sm)
=> Queue si sm so a
-> Double
-> Event a
dequeueExtract q t' =
Event $ \p ->
do i <- invokeEvent p $
strategyDequeue (queueStore q)
invokeEvent p $
dequeuePostExtract q t' i
dequeuePostExtract :: (DequeueStrategy si,
DequeueStrategy sm)
=> Queue si sm so a
-> Double
-> QueueItem a
-> Event a
dequeuePostExtract q t' i =
Event $ \p ->
do c <- readIORef (queueCountRef q)
let c' = c - 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (dequeueExtractCountRef q) (+ 1)
invokeEvent p $
dequeueStat q t' i
invokeEvent p $
releaseResourceWithinEvent (enqueueRes q)
invokeEvent p $
triggerSignal (dequeueExtractedSource q) (itemValue i)
return $ itemValue i
dequeueStat :: Queue si sm so a
-> Double
-> QueueItem a
-> Event ()
dequeueStat q t' i =
Event $ \p ->
do let t0 = itemInputTime i
t1 = itemStoringTime i
t = pointTime p
modifyIORef' (dequeueWaitTimeRef q) $
addSamplingStats (t - t')
modifyIORef' (queueTotalWaitTimeRef q) $
addSamplingStats (t - t0)
modifyIORef' (queueWaitTimeRef q) $
addSamplingStats (t - t1)
waitWhileFullQueue :: Queue si sm so a -> Process ()
waitWhileFullQueue q =
do x <- liftEvent (queueFull q)
when x $
do processAwait (dequeueExtracted q)
waitWhileFullQueue q
queueChanged_ :: Queue si sm so a -> Signal ()
queueChanged_ q =
mapSignal (const ()) (enqueueInitiated q) <>
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (enqueueLost q) <>
dequeueRequested q <>
mapSignal (const ()) (dequeueExtracted q)
queueSummary :: (Show si, Show sm, Show so) => Queue si sm so a -> Int -> Event ShowS
queueSummary q indent =
do let si = enqueueStrategy q
sm = enqueueStoringStrategy q
so = dequeueStrategy q
null <- queueNull q
full <- queueFull q
let maxCount = queueMaxCount q
count <- queueCount q
countStats <- queueCountStats q
enqueueCount <- enqueueCount q
enqueueLostCount <- enqueueLostCount q
enqueueStoreCount <- enqueueStoreCount q
dequeueCount <- dequeueCount q
dequeueExtractCount <- dequeueExtractCount q
loadFactor <- queueLoadFactor q
enqueueRate <- enqueueRate q
enqueueStoreRate <- enqueueStoreRate q
dequeueRate <- dequeueRate q
dequeueExtractRate <- dequeueExtractRate q
waitTime <- queueWaitTime q
totalWaitTime <- queueTotalWaitTime q
enqueueWaitTime <- enqueueWaitTime q
dequeueWaitTime <- dequeueWaitTime q
let tab = replicate indent ' '
return $
showString tab .
showString "the enqueueing (input) strategy = " .
shows si .
showString "\n" .
showString tab .
showString "the storing (memory) strategy = " .
shows sm .
showString "\n" .
showString tab .
showString "the dequeueing (output) strategy = " .
shows so .
showString "\n" .
showString tab .
showString "empty? = " .
shows null .
showString "\n" .
showString tab .
showString "full? = " .
shows full .
showString "\n" .
showString tab .
showString "max. capacity = " .
shows maxCount .
showString "\n" .
showString tab .
showString "size = " .
shows count .
showString "\n" .
showString tab .
showString "the size statistics = \n\n" .
timingStatsSummary countStats (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue count (number of the input items that were enqueued) = " .
shows enqueueCount .
showString "\n" .
showString tab .
showString "the enqueue lost count (number of the lost items) = " .
shows enqueueLostCount .
showString "\n" .
showString tab .
showString "the enqueue store count (number of the input items that were stored) = " .
shows enqueueStoreCount .
showString "\n" .
showString tab .
showString "the dequeue count (number of requests for dequeueing an item) = " .
shows dequeueCount .
showString "\n" .
showString tab .
showString "the dequeue extract count (number of the output items that were dequeued) = " .
shows dequeueExtractCount .
showString "\n" .
showString tab .
showString "the load factor (size / max. capacity) = " .
shows loadFactor .
showString "\n" .
showString tab .
showString "the enqueue rate (how many input items were enqueued per time) = " .
shows enqueueRate .
showString "\n" .
showString tab .
showString "the enqueue store rate (how many input items were stored per time) = " .
shows enqueueStoreRate .
showString "\n" .
showString tab .
showString "the dequeue rate (how many requests for dequeueing per time) = " .
shows dequeueRate .
showString "\n" .
showString tab .
showString "the dequeue extract rate (how many output items were dequeued per time) = " .
shows dequeueExtractRate .
showString "\n" .
showString tab .
showString "the wait time (when was stored -> when was dequeued) = \n\n" .
samplingStatsSummary waitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the total wait time (when the enqueueing was initiated -> when was dequeued) = \n\n" .
samplingStatsSummary totalWaitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue wait time (when the enqueueing was initiated -> when was stored) = \n\n" .
samplingStatsSummary enqueueWaitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the dequeue wait time (when was requested for dequeueing -> when was dequeued) = \n\n" .
samplingStatsSummary dequeueWaitTime (2 + indent)
resetQueue :: Queue si sm so a -> Event ()
resetQueue q =
Event $ \p ->
do let t = pointTime p
queueCount <- readIORef (queueCountRef q)
writeIORef (queueCountStatsRef q) $
returnTimingStats t queueCount
writeIORef (enqueueCountRef q) 0
writeIORef (enqueueLostCountRef q) 0
writeIORef (enqueueStoreCountRef q) 0
writeIORef (dequeueCountRef q) 0
writeIORef (dequeueExtractCountRef q) 0
writeIORef (queueWaitTimeRef q) mempty
writeIORef (queueTotalWaitTimeRef q) mempty
writeIORef (enqueueWaitTimeRef q) mempty
writeIORef (dequeueWaitTimeRef q) mempty