module Simulation.Aivika.Queue.Infinite
(
FCFSQueue,
LCFSQueue,
SIROQueue,
PriorityQueue,
Queue,
newFCFSQueue,
newLCFSQueue,
newSIROQueue,
newPriorityQueue,
newQueue,
enqueueStoringStrategy,
dequeueStrategy,
queueNull,
queueCount,
queueCountStats,
enqueueStoreCount,
dequeueCount,
dequeueExtractCount,
enqueueStoreRate,
dequeueRate,
dequeueExtractRate,
queueWaitTime,
dequeueWaitTime,
queueRate,
dequeue,
dequeueWithOutputPriority,
tryDequeue,
enqueue,
enqueueWithStoringPriority,
queueDelete,
queueDelete_,
queueDeleteBy,
queueDeleteBy_,
queueContains,
queueContainsBy,
clearQueue,
resetQueue,
queueSummary,
queueNullChanged,
queueNullChanged_,
queueCountChanged,
queueCountChanged_,
enqueueStoreCountChanged,
enqueueStoreCountChanged_,
dequeueCountChanged,
dequeueCountChanged_,
dequeueExtractCountChanged,
dequeueExtractCountChanged_,
queueWaitTimeChanged,
queueWaitTimeChanged_,
dequeueWaitTimeChanged,
dequeueWaitTimeChanged_,
queueRateChanged,
queueRateChanged_,
enqueueStored,
dequeueRequested,
dequeueExtracted,
queueChanged_) where
import Data.IORef
import Data.Monoid
import Data.Maybe
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource.Base
import Simulation.Aivika.QueueStrategy
import Simulation.Aivika.Statistics
import qualified Simulation.Aivika.DoubleLinkedList as DLL
import qualified Simulation.Aivika.Vector as V
import qualified Simulation.Aivika.PriorityQueue as PQ
type FCFSQueue a = Queue FCFS FCFS a
type LCFSQueue a = Queue LCFS FCFS a
type SIROQueue a = Queue SIRO FCFS a
type PriorityQueue a = Queue StaticPriorities FCFS a
data Queue sm so a =
Queue { enqueueStoringStrategy :: sm,
dequeueStrategy :: so,
queueStore :: StrategyQueue sm (QueueItem a),
dequeueRes :: Resource so,
queueCountRef :: IORef Int,
queueCountStatsRef :: IORef (TimingStats Int),
enqueueStoreCountRef :: IORef Int,
dequeueCountRef :: IORef Int,
dequeueExtractCountRef :: IORef Int,
queueWaitTimeRef :: IORef (SamplingStats Double),
dequeueWaitTimeRef :: IORef (SamplingStats Double),
enqueueStoredSource :: SignalSource a,
dequeueRequestedSource :: SignalSource (),
dequeueExtractedSource :: SignalSource a }
data QueueItem a =
QueueItem { itemValue :: a,
itemStoringTime :: Double
}
newFCFSQueue :: Event (FCFSQueue a)
newFCFSQueue = newQueue FCFS FCFS
newLCFSQueue :: Event (LCFSQueue a)
newLCFSQueue = newQueue LCFS FCFS
newSIROQueue :: Event (SIROQueue a)
newSIROQueue = newQueue SIRO FCFS
newPriorityQueue :: Event (PriorityQueue a)
newPriorityQueue = newQueue StaticPriorities FCFS
newQueue :: (QueueStrategy sm,
QueueStrategy so) =>
sm
-> so
-> Event (Queue sm so a)
newQueue sm so =
do t <- liftDynamics time
i <- liftIO $ newIORef 0
is <- liftIO $ newIORef $ returnTimingStats t 0
cm <- liftIO $ newIORef 0
cr <- liftIO $ newIORef 0
co <- liftIO $ newIORef 0
qm <- liftSimulation $ newStrategyQueue sm
ro <- liftSimulation $ newResourceWithMaxCount so 0 Nothing
w <- liftIO $ newIORef mempty
wo <- liftIO $ newIORef mempty
s3 <- liftSimulation newSignalSource
s4 <- liftSimulation newSignalSource
s5 <- liftSimulation newSignalSource
return Queue { enqueueStoringStrategy = sm,
dequeueStrategy = so,
queueStore = qm,
dequeueRes = ro,
queueCountRef = i,
queueCountStatsRef = is,
enqueueStoreCountRef = cm,
dequeueCountRef = cr,
dequeueExtractCountRef = co,
queueWaitTimeRef = w,
dequeueWaitTimeRef = wo,
enqueueStoredSource = s3,
dequeueRequestedSource = s4,
dequeueExtractedSource = s5 }
queueNull :: Queue sm so a -> Event Bool
queueNull q =
Event $ \p ->
do n <- readIORef (queueCountRef q)
return (n == 0)
queueNullChanged :: Queue sm so a -> Signal Bool
queueNullChanged q =
mapSignalM (const $ queueNull q) (queueNullChanged_ q)
queueNullChanged_ :: Queue sm so a -> Signal ()
queueNullChanged_ = queueCountChanged_
queueCount :: Queue sm so a -> Event Int
queueCount q =
Event $ \p -> readIORef (queueCountRef q)
queueCountStats :: Queue sm so a -> Event (TimingStats Int)
queueCountStats q =
Event $ \p -> readIORef (queueCountStatsRef q)
queueCountChanged :: Queue sm so a -> Signal Int
queueCountChanged q =
mapSignalM (const $ queueCount q) (queueCountChanged_ q)
queueCountChanged_ :: Queue sm so a -> Signal ()
queueCountChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueStoreCount :: Queue sm so a -> Event Int
enqueueStoreCount q =
Event $ \p -> readIORef (enqueueStoreCountRef q)
enqueueStoreCountChanged :: Queue sm so a -> Signal Int
enqueueStoreCountChanged q =
mapSignalM (const $ enqueueStoreCount q) (enqueueStoreCountChanged_ q)
enqueueStoreCountChanged_ :: Queue sm so a -> Signal ()
enqueueStoreCountChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueCount :: Queue sm so a -> Event Int
dequeueCount q =
Event $ \p -> readIORef (dequeueCountRef q)
dequeueCountChanged :: Queue sm so a -> Signal Int
dequeueCountChanged q =
mapSignalM (const $ dequeueCount q) (dequeueCountChanged_ q)
dequeueCountChanged_ :: Queue sm so a -> Signal ()
dequeueCountChanged_ q =
mapSignal (const ()) (dequeueRequested q)
dequeueExtractCount :: Queue sm so a -> Event Int
dequeueExtractCount q =
Event $ \p -> readIORef (dequeueExtractCountRef q)
dequeueExtractCountChanged :: Queue sm so a -> Signal Int
dequeueExtractCountChanged q =
mapSignalM (const $ dequeueExtractCount q) (dequeueExtractCountChanged_ q)
dequeueExtractCountChanged_ :: Queue sm so a -> Signal ()
dequeueExtractCountChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
enqueueStoreRate :: Queue sm so a -> Event Double
enqueueStoreRate q =
Event $ \p ->
do x <- readIORef (enqueueStoreCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t - t0))
dequeueRate :: Queue sm so a -> Event Double
dequeueRate q =
Event $ \p ->
do x <- readIORef (dequeueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t - t0))
dequeueExtractRate :: Queue sm so a -> Event Double
dequeueExtractRate q =
Event $ \p ->
do x <- readIORef (dequeueExtractCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t - t0))
queueWaitTime :: Queue sm so a -> Event (SamplingStats Double)
queueWaitTime q =
Event $ \p -> readIORef (queueWaitTimeRef q)
queueWaitTimeChanged :: Queue sm so a -> Signal (SamplingStats Double)
queueWaitTimeChanged q =
mapSignalM (const $ queueWaitTime q) (queueWaitTimeChanged_ q)
queueWaitTimeChanged_ :: Queue sm so a -> Signal ()
queueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
dequeueWaitTime :: Queue sm so a -> Event (SamplingStats Double)
dequeueWaitTime q =
Event $ \p -> readIORef (dequeueWaitTimeRef q)
dequeueWaitTimeChanged :: Queue sm so a -> Signal (SamplingStats Double)
dequeueWaitTimeChanged q =
mapSignalM (const $ dequeueWaitTime q) (dequeueWaitTimeChanged_ q)
dequeueWaitTimeChanged_ :: Queue sm so a -> Signal ()
dequeueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueRate :: Queue sm so a -> Event Double
queueRate q =
Event $ \p ->
do x <- readIORef (queueCountStatsRef q)
y <- readIORef (queueWaitTimeRef q)
return (timingStatsMean x / samplingStatsMean y)
queueRateChanged :: Queue sm so a -> Signal Double
queueRateChanged q =
mapSignalM (const $ queueRate q) (queueRateChanged_ q)
queueRateChanged_ :: Queue sm so a -> Signal ()
queueRateChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
dequeue :: (DequeueStrategy sm,
EnqueueStrategy so)
=> Queue sm so a
-> Process a
dequeue q =
do t <- liftEvent $ dequeueRequest q
requestResource (dequeueRes q)
liftEvent $ dequeueExtract q t
dequeueWithOutputPriority :: (DequeueStrategy sm,
PriorityQueueStrategy so po)
=> Queue sm so a
-> po
-> Process a
dequeueWithOutputPriority q po =
do t <- liftEvent $ dequeueRequest q
requestResourceWithPriority (dequeueRes q) po
liftEvent $ dequeueExtract q t
tryDequeue :: DequeueStrategy sm
=> Queue sm so a
-> Event (Maybe a)
tryDequeue q =
do x <- tryRequestResourceWithinEvent (dequeueRes q)
if x
then do t <- dequeueRequest q
fmap Just $ dequeueExtract q t
else return Nothing
queueDelete :: (Eq a,
DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue sm so a
-> a
-> Event Bool
queueDelete q a = fmap isJust $ queueDeleteBy q (== a)
queueDelete_ :: (Eq a,
DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue sm so a
-> a
-> Event ()
queueDelete_ q a = fmap (const ()) $ queueDeleteBy q (== a)
queueDeleteBy :: (DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue sm so a
-> (a -> Bool)
-> Event (Maybe a)
queueDeleteBy q pred =
do x <- tryRequestResourceWithinEvent (dequeueRes q)
if x
then do i <- strategyQueueDeleteBy (queueStore q) (pred . itemValue)
case i of
Nothing ->
do releaseResourceWithinEvent (dequeueRes q)
return Nothing
Just i ->
do t <- dequeueRequest q
fmap Just $ dequeuePostExtract q t i
else return Nothing
queueDeleteBy_ :: (DeletingQueueStrategy sm,
DequeueStrategy so)
=> Queue sm so a
-> (a -> Bool)
-> Event ()
queueDeleteBy_ q pred = fmap (const ()) $ queueDeleteBy q pred
queueContains :: (Eq a,
DeletingQueueStrategy sm)
=> Queue sm so a
-> a
-> Event Bool
queueContains q a = fmap isJust $ queueContainsBy q (== a)
queueContainsBy :: DeletingQueueStrategy sm
=> Queue sm so a
-> (a -> Bool)
-> Event (Maybe a)
queueContainsBy q pred =
do x <- strategyQueueContainsBy (queueStore q) (pred . itemValue)
case x of
Nothing -> return Nothing
Just i -> return $ Just (itemValue i)
clearQueue :: DequeueStrategy sm
=> Queue sm so a
-> Event ()
clearQueue q =
do x <- tryDequeue q
case x of
Nothing -> return ()
Just a -> clearQueue q
enqueue :: (EnqueueStrategy sm,
DequeueStrategy so)
=> Queue sm so a
-> a
-> Event ()
enqueue = enqueueStore
enqueueWithStoringPriority :: (PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue sm so a
-> pm
-> a
-> Event ()
enqueueWithStoringPriority = enqueueStoreWithPriority
enqueueStored :: Queue sm so a -> Signal a
enqueueStored q = publishSignal (enqueueStoredSource q)
dequeueRequested :: Queue sm so a -> Signal ()
dequeueRequested q = publishSignal (dequeueRequestedSource q)
dequeueExtracted :: Queue sm so a -> Signal a
dequeueExtracted q = publishSignal (dequeueExtractedSource q)
enqueueStore :: (EnqueueStrategy sm,
DequeueStrategy so)
=> Queue sm so a
-> a
-> Event ()
enqueueStore q a =
Event $ \p ->
do let i = QueueItem { itemValue = a,
itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueue (queueStore q) i
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i)
enqueueStoreWithPriority :: (PriorityQueueStrategy sm pm,
DequeueStrategy so)
=> Queue sm so a
-> pm
-> a
-> Event ()
enqueueStoreWithPriority q pm a =
Event $ \p ->
do let i = QueueItem { itemValue = a,
itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueueWithPriority (queueStore q) pm i
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i)
dequeueRequest :: Queue sm so a
-> Event Double
dequeueRequest q =
Event $ \p ->
do modifyIORef' (dequeueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (dequeueRequestedSource q) ()
return $ pointTime p
dequeueExtract :: DequeueStrategy sm
=> Queue sm so a
-> Double
-> Event a
dequeueExtract q t' =
Event $ \p ->
do i <- invokeEvent p $
strategyDequeue (queueStore q)
invokeEvent p $
dequeuePostExtract q t' i
dequeuePostExtract :: DequeueStrategy sm
=> Queue sm so a
-> Double
-> QueueItem a
-> Event a
dequeuePostExtract q t' i =
Event $ \p ->
do c <- readIORef (queueCountRef q)
let c' = c - 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (dequeueExtractCountRef q) (+ 1)
invokeEvent p $
dequeueStat q t' i
invokeEvent p $
triggerSignal (dequeueExtractedSource q) (itemValue i)
return $ itemValue i
dequeueStat :: Queue sm so a
-> Double
-> QueueItem a
-> Event ()
dequeueStat q t' i =
Event $ \p ->
do let t1 = itemStoringTime i
t = pointTime p
modifyIORef' (dequeueWaitTimeRef q) $
addSamplingStats (t - t')
modifyIORef' (queueWaitTimeRef q) $
addSamplingStats (t - t1)
queueChanged_ :: Queue sm so a -> Signal ()
queueChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
dequeueRequested q <>
mapSignal (const ()) (dequeueExtracted q)
queueSummary :: (Show sm, Show so) => Queue sm so a -> Int -> Event ShowS
queueSummary q indent =
do let sm = enqueueStoringStrategy q
so = dequeueStrategy q
null <- queueNull q
count <- queueCount q
countStats <- queueCountStats q
enqueueStoreCount <- enqueueStoreCount q
dequeueCount <- dequeueCount q
dequeueExtractCount <- dequeueExtractCount q
enqueueStoreRate <- enqueueStoreRate q
dequeueRate <- dequeueRate q
dequeueExtractRate <- dequeueExtractRate q
waitTime <- queueWaitTime q
dequeueWaitTime <- dequeueWaitTime q
let tab = replicate indent ' '
return $
showString tab .
showString "the storing (memory) strategy = " .
shows sm .
showString "\n" .
showString tab .
showString "the dequeueing (output) strategy = " .
shows so .
showString "\n" .
showString tab .
showString "empty? = " .
shows null .
showString "\n" .
showString tab .
showString "the current size = " .
shows count .
showString "\n" .
showString tab .
showString "the size statistics = \n\n" .
timingStatsSummary countStats (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue store count (number of the input items that were stored) = " .
shows enqueueStoreCount .
showString "\n" .
showString tab .
showString "the dequeue count (number of requests for dequeueing an item) = " .
shows dequeueCount .
showString "\n" .
showString tab .
showString "the dequeue extract count (number of the output items that were dequeued) = " .
shows dequeueExtractCount .
showString "\n" .
showString tab .
showString "the enqueue store rate (how many input items were stored per time) = " .
shows enqueueStoreRate .
showString "\n" .
showString tab .
showString "the dequeue rate (how many requests for dequeueing per time) = " .
shows dequeueRate .
showString "\n" .
showString tab .
showString "the dequeue extract rate (how many output items were dequeued per time) = " .
shows dequeueExtractRate .
showString "\n" .
showString tab .
showString "the wait time (when was stored -> when was dequeued) = \n\n" .
samplingStatsSummary waitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the dequeue wait time (when was requested for dequeueing -> when was dequeued) = \n\n" .
samplingStatsSummary dequeueWaitTime (2 + indent)
resetQueue :: Queue sm so a -> Event ()
resetQueue q =
Event $ \p ->
do let t = pointTime p
queueCount <- readIORef (queueCountRef q)
writeIORef (queueCountStatsRef q) $
returnTimingStats t queueCount
writeIORef (enqueueStoreCountRef q) 0
writeIORef (dequeueCountRef q) 0
writeIORef (dequeueExtractCountRef q) 0
writeIORef (queueWaitTimeRef q) mempty
writeIORef (dequeueWaitTimeRef q) mempty