-- | -- Module: Data.STM.RollingQueue -- Copyright: (c) Joseph Adams 2012 -- License: BSD3 -- Maintainer: joeyadams3.14159@gmail.com -- Portability: Requires STM -- -- This module is usually imported qualified: -- -- >import Data.STM.RollingQueue (RollingQueue) -- >import qualified Data.STM.RollingQueue as RQ {-# LANGUAGE BangPatterns, DeriveDataTypeable #-} module Data.STM.RollingQueue ( RollingQueue, new, newIO, write, read, tryRead, isEmpty, length, setLimit, getLimit, -- * Debugging checkInvariants, CheckException(..), dump, ) where import Prelude hiding (length, read) import qualified Prelude import Control.Concurrent.STM hiding (check) import Control.Exception (Exception) import Control.Monad (join) import Data.Typeable (Typeable) -- | A 'RollingQueue' is a bounded FIFO channel. When the size limit is -- exceeded, older entries are discarded to make way for newer entries. -- -- Note: if the size limit is less than @1@, 'write' will have no effect, and -- 'read' will always 'retry'. data RollingQueue a = RQ (TVar (ReadEnd a)) (TVar (WriteEnd a)) deriving Typeable -- Invariants: -- -- * writeCounter - readCounter = number of items in the queue -- -- * writeCounter >= readCounter, since the queue count cannot be negative instance Eq (RollingQueue a) where (==) (RQ r1 _) (RQ r2 _) = r1 == r2 -- | Invariants: -- -- * readCounter >= 0, readDiscarded >= 0 data ReadEnd a = ReadEnd { readPtr :: !(TCell a) -- ^ Pointer to next item in the stream , readCounter :: !Int -- ^ Number of reads since we last synced with the writer , readDiscarded :: !Int -- ^ Number of log entries discarded since the last read } -- | Invariants: -- -- * readTVar writePtr ==> TNil -- -- * writeCounter <= sizeLimit. However, this will normally be false for the -- 'WriteEnd' passed to 'syncEnds'. -- -- * writeCounter >= 0, sizeLimit >= 0 data WriteEnd a = WriteEnd { writePtr :: !(TCell a) -- ^ Pointer to the hole (which is a TNil) , writeCounter :: !Int -- ^ Write counter. Number of items in the queue, not taking into -- account reads performed since the last call to 'syncEnds'. , sizeLimit :: !Int -- ^ The size limit of the RollingQueue is stored here, in the -- 'WriteEnd'. This makes it convenient for 'write' to access. } type TCell a = TVar (TList a) data TList a = TNil | TCons a (TCell a) -- | Create a new, empty 'RollingQueue', with the given size limit. -- -- To change the size limit later, use 'setLimit'. new :: Int -> STM (RollingQueue a) new limit = do hole <- newTVar TNil rv <- newTVar $ ReadEnd hole 0 0 wv <- newTVar $ WriteEnd hole 0 (max 0 limit) return (RQ rv wv) {- | @IO@ variant of 'new'. This is useful for creating top-level 'RollingQueue's using 'System.IO.Unsafe.unsafePerformIO', because performing 'atomically' inside a pure computation is extremely dangerous (can lead to 'Control.Exception.NestedAtomically' errors and even segfaults, see GHC ticket #5866). Example: @ logQueue :: 'RollingQueue' LogEntry logQueue = 'System.IO.Unsafe.unsafePerformIO' (RQ.'newIO' 1000) \{\-\# NOINLINE logQueue \#\-\} @ -} newIO :: Int -> IO (RollingQueue a) newIO limit = do hole <- newTVarIO TNil rv <- newTVarIO $ ReadEnd hole 0 0 wv <- newTVarIO $ WriteEnd hole 0 (max 0 limit) return (RQ rv wv) -- | Write an entry to the rolling queue. If the queue is full, discard the -- oldest entry. -- -- There is no @tryWrite@, because 'write' never retries. write :: RollingQueue a -> a -> STM () write rq@(RQ _ wv) x = do w <- readTVar wv new_hole <- newTVar TNil writeTVar (writePtr w) (TCons x new_hole) updateWriteEnd rq $ WriteEnd new_hole (writeCounter w + 1) (sizeLimit w) -- | Read the next entry from the 'RollingQueue'. 'retry' if the queue is -- empty. -- -- The 'Int' is the number of entries discarded since the last read operation -- (usually 0). read :: RollingQueue a -> STM (a, Int) read rq = tryRead rq >>= maybe retry return -- | Like 'read', but do not 'retry'. Instead, return 'Nothing' if the queue -- is empty. tryRead :: RollingQueue a -> STM (Maybe (a, Int)) tryRead (RQ rv _) = do r <- readTVar rv xs <- readTVar (readPtr r) case xs of TNil -> return Nothing TCons x cell' -> do writeTVar rv $ ReadEnd cell' (readCounter r + 1) 0 return $ Just (x, readDiscarded r) -- | Test if the queue is empty. isEmpty :: RollingQueue a -> STM Bool isEmpty (RQ rv _) = do r <- readTVar rv xs <- readTVar (readPtr r) case xs of TNil -> return True TCons _ _ -> return False -- | /O(1)/ Get the number of items in the queue. length :: RollingQueue a -> STM Int length (RQ rv wv) = do r <- readTVar rv w <- readTVar wv return (writeCounter w - readCounter r) -- | Adjust the size limit. Queue entries will be discarded if necessary to -- satisfy the new limit. setLimit :: RollingQueue a -> Int -> STM () setLimit rq@(RQ _ wv) new_limit = do w <- readTVar wv updateWriteEnd rq w{sizeLimit = max 0 new_limit} -- | Get the current size limit. This will return 0 if a negative value was -- passed to 'new', 'newIO', or 'setLimit'. getLimit :: RollingQueue a -> STM Int getLimit (RQ _ wv) = do w <- readTVar wv return (sizeLimit w) ------------------------------------------------------------------------ -- Internal -- | Update the 'WriteEnd'. If the size limit is exceeded, use 'syncEnds'. updateWriteEnd :: RollingQueue a -> WriteEnd a -> STM () updateWriteEnd (RQ rv wv) w | writeCounter w <= sizeLimit w = writeTVar wv w | otherwise = do r <- readTVar rv (r', w') <- syncEnds r w writeTVar rv r' writeTVar wv w' -- | Sync the reader and writer. This sets the ReadEnd's counter to 0, and -- discards old log entries to satisfy the size limit (if necessary). syncEnds :: ReadEnd a -> WriteEnd a -> STM (ReadEnd a, WriteEnd a) syncEnds r w = do let count = writeCounter w - readCounter r limit = sizeLimit w if count > limit then do let drop_count = count - limit rp' <- dropItems drop_count (readPtr r) return ( ReadEnd rp' 0 (readDiscarded r + drop_count) , WriteEnd (writePtr w) limit limit ) else return ( ReadEnd (readPtr r) 0 (readDiscarded r) , WriteEnd (writePtr w) count limit ) -- | TCell variant of 'drop'. This does not modify the cells themselves, it -- just returns the new pointer. dropItems :: Int -> TCell a -> STM (TCell a) dropItems n cell | n <= 0 = return cell | otherwise = do xs <- readTVar cell case xs of TNil -> return cell TCons _ cell' -> dropItems (n-1) cell' ------------------------------------------------------------------------ -- Debugging data CheckException = CheckException String deriving Typeable instance Show CheckException where show (CheckException msg) = "Data.STM.RollingQueue checkInvariants: " ++ msg instance Exception CheckException -- | Verify internal structure. Throw a 'CheckException' if the check fails, -- signifying a bug in the implementation. checkInvariants :: RollingQueue a -> STM () checkInvariants (RQ rv wv) = do r <- readTVar rv w <- readTVar wv check (readCounter r >= 0) "readCounter >= 0" check (readDiscarded r >= 0) "readDiscarded >= 0" check (writeCounter w >= 0) "writeCounter >= 0" check (sizeLimit w >= 0) "sizeLimit >= 0" check (writeCounter w <= sizeLimit w) "writeCounter <= sizeLimit" hole <- readTVar (writePtr w) case hole of TNil -> return () TCons _ _ -> throwSTM $ CheckException "writePtr does not point to a TNil" check (writeCounter w >= readCounter r) "writeCounter >= readCounter" len <- traverseLength (readPtr r) check (writeCounter w - readCounter r == len) "writeCounter - readCounter == length" where check b expr | b = return () | otherwise = throwSTM $ CheckException $ expr ++ " does not hold" traverseLength :: TCell a -> STM Int traverseLength = loop 0 where loop !n cell = do xs <- readTVar cell case xs of TNil -> return n TCons _ cell' -> loop (n+1) cell' -- | Return a list of all items currently in the queue. This does not modify -- the 'RollingQueue'. getItems :: RollingQueue a -> STM [a] getItems (RQ rv _) = do r <- readTVar rv loop id (readPtr r) where loop dl cell = do xs <- readTVar cell case xs of TNil -> return $ dl [] TCons x cell' -> loop (dl . (x :)) cell' -- | Return a list of internal values as key-value pairs. getAttributes :: RollingQueue a -> STM [(String, String)] getAttributes (RQ rv wv) = do r <- readTVar rv w <- readTVar wv return [ ("readCounter", show $ readCounter r) , ("readDiscarded", show $ readDiscarded r) , ("writeCounter", show $ writeCounter w) , ("sizeLimit", show $ sizeLimit w) ] -- | Dump the RollingQueue (output and internal counters) to standard output. -- This calls 'checkInvariants' first. dump :: Show a => RollingQueue a -> IO () dump rq = join $ atomically $ do checkInvariants rq xs <- getItems rq attrs <- getAttributes rq return $ do print xs let c1width = maximum $ map (Prelude.length . fst) attrs mapM_ putStrLn [k ++ replicate (c1width - Prelude.length k) ' ' ++ " = " ++ v | (k, v) <- attrs]