{-# OPTIONS_GHC -Wall -fwarn-tabs #-} {-# LANGUAGE DoAndIfThenElse #-} {-# LANGUAGE DeriveDataTypeable #-} ---------------------------------------------------------------- -- | -- Module : Control.Concurrent.STM.TMChunkedQueue -- Copyright : Copyright (c) 2014 Alexander Kondratskiy -- License : BSD3 -- Maintainer : Alexander Kondratskiy -- Portability : non-portable (GHC STM, DeriveDataTypeable) -- -- A version of @Control.Concurrent.STM.TQueue@ where the queue is closeable -- and allows complete draining. This makes it possible to chunk items based on -- a timeout or a "settle period". This is useful when items/requests arriving -- through the queue are too granular and have to be combined, while retaining -- responsiveness. -- -- Some capabilities of @TQueue@ are missing (such as unget) due to design -- tradeoffs. -- -- /Since: 0.1.0/ ------------------------------------------------------------------ module Control.Concurrent.STM.TMChunkedQueue ( -- * The TMChunkedQueue type TMChunkedQueue, ChunkedQueue, consumeQueue, -- ** Creating TMChunkedQueues newTMChunkedQueue, newTMChunkedQueueIO, -- ** Draining TMChunkedQueues drainTMChunkedQueue, tryDrainTMChunkedQueue, -- ** Writing to TMChunkedQueues writeTMChunkedQueue, writeManyTMChunkedQueue, -- ** Closing TMChunkedQueues closeTMChunkedQueue, -- ** Predicates isEmptyTMChunkedQueue, isClosedTMChunkedQueue, -- * Chunked operations drainAndSettleTMChunkedQueue, drainWithTimeoutTMChunkedQueue, ) where import Data.Typeable (Typeable) import Prelude hiding (reads) import Control.Applicative ((<$>), (<*>)) import Control.Monad import Control.Monad.STM (STM, atomically) import Control.Concurrent.STM.TVar import Control.Concurrent.Async (race) import Control.Concurrent (threadDelay) import Control.Concurrent.STM.ChunkedQueue import Control.Concurrent.STM.TChunkedQueue ---------------------------------------------------------------- -- | @TMChunkedQueue@ is an abstract type representing a closeable, drainable -- FIFO queue. data TMChunkedQueue a = TMChunkedQueue { _isClosed :: {-# UNPACK #-} !(TVar Bool), _queue :: {-# UNPACK #-} !(TChunkedQueue a) } deriving Typeable -- | Build and returns a new instance of @TMChunkedQueue@ newTMChunkedQueue :: STM (TMChunkedQueue a) newTMChunkedQueue = TMChunkedQueue <$> newTVar False <*> newTChunkedQueue -- | @IO@ version of 'newTMChunkedQueue' newTMChunkedQueueIO :: IO (TMChunkedQueue a) newTMChunkedQueueIO = TMChunkedQueue <$> newTVarIO False <*> newTChunkedQueueIO nonEmptyList :: [a] -> Maybe [a] nonEmptyList [] = Nothing nonEmptyList xs = Just xs nonEmptyChunkedQueue :: ChunkedQueue a -> Maybe (ChunkedQueue a) nonEmptyChunkedQueue (ChunkedQueue xs) = ChunkedQueue <$> nonEmptyList xs -- | Drain everything contained in the @TMChunkedQueue@, but block if it is -- empty. Corollary: never returns empty queue. -- -- * Closed, Empty - @Nothing@ -- * Closed, Non-Empty - @Just [...]@ -- * Open, Empty - @Blocks@ -- * Open, Non-Empty - @Just [...]@ -- drainTMChunkedQueue :: TMChunkedQueue a -> STM (Maybe (ChunkedQueue a)) drainTMChunkedQueue (TMChunkedQueue closed queue) = do isClosed <- readTVar closed if isClosed then nonEmptyChunkedQueue <$> tryDrainTChunkedQueue queue else Just <$> drainTChunkedQueue queue -- | Drain everything contained in the @TMChunkedQueue@. Doesn't block. -- -- * Closed, Empty - @Nothing@ -- * Closed, Non-Empty - @Just [...]@ -- * Open, Empty - @Just []@ -- * Open, Non-Empty - @Just [...]@ -- tryDrainTMChunkedQueue :: TMChunkedQueue a -> STM (Maybe (ChunkedQueue a)) tryDrainTMChunkedQueue (TMChunkedQueue closed queue) = do isClosed <- readTVar closed if isClosed then nonEmptyChunkedQueue <$> tryDrainTChunkedQueue queue else Just <$> tryDrainTChunkedQueue queue doIfOpen :: TMChunkedQueue a -> (TChunkedQueue a -> STM ()) -> STM () doIfOpen (TMChunkedQueue closed queue) action = do isClosed <- readTVar closed unless isClosed $ action queue -- | Write many values to a @TMChunkedQueue@ writeManyTMChunkedQueue :: TMChunkedQueue a -> [a] -> STM () writeManyTMChunkedQueue queue xs = void $ doIfOpen queue $ flip writeManyTChunkedQueue xs -- | Write a value to a @TMChunkedQueue@ writeTMChunkedQueue :: TMChunkedQueue a -> a -> STM () writeTMChunkedQueue queue x = void $ doIfOpen queue $ flip writeTChunkedQueue x -- | Returns @True@ if the supplied @TMChunkedQueue@ is empty. isEmptyTMChunkedQueue :: TMChunkedQueue a -> STM Bool isEmptyTMChunkedQueue (TMChunkedQueue _ queue) = isEmptyTChunkedQueue queue -- | Closes the @TMQueue@, preventing any further writes. closeTMChunkedQueue :: TMChunkedQueue a -> STM () closeTMChunkedQueue (TMChunkedQueue closed _queue) = writeTVar closed True -- | Returns @True@ if the supplied @TMChunkedQueue@ has been closed. isClosedTMChunkedQueue :: TMChunkedQueue a -> STM Bool isClosedTMChunkedQueue (TMChunkedQueue closed _queue) = readTVar closed ---------------------------------------------------------------- -- | Keep draining the queue until no more items are seen for at least -- the given timeout period. Blocks if the queue is empty to begin with, -- and starts timing after the first value appears in the queue. drainAndSettleTMChunkedQueue :: Int -- ^ settle period in microseconds -> TMChunkedQueue a -> IO (Maybe (ChunkedQueue a)) drainAndSettleTMChunkedQueue delay queue = do maybeChQueue <- atomically $ drainTMChunkedQueue queue case maybeChQueue of Nothing -> return Nothing Just (ChunkedQueue chunks) -> Just <$> go chunks where go acc = do threadDelay delay let terminate = return $ ChunkedQueue acc maybeChQueue <- atomically $ tryDrainTMChunkedQueue queue case maybeChQueue of Nothing -> terminate Just (ChunkedQueue []) -> terminate Just (ChunkedQueue chunks) -> go (chunks ++ acc) -- | Keep draining the queue for at least the specified time period. Blocks if -- the queue is empty to begin with, and starts timing as soon as the first -- value appears in the queue. drainWithTimeoutTMChunkedQueue :: Int -- ^ timeout in microseconds -> TMChunkedQueue a -> IO (Maybe (ChunkedQueue a)) drainWithTimeoutTMChunkedQueue delay queue = do stashedQueue <- newTChunkedQueueIO let transferItems = atomically $ do maybeItems <- drainTMChunkedQueue queue case maybeItems of Nothing -> return Nothing Just items -> Just <$> stashedQueue `writeManyTChunkedQueue` consumeQueue items let transferTask = do result <- transferItems case result of Nothing -> return () Just _ -> transferTask result <- transferItems -- run transfer once before timing, so we block on empty queue. case result of Nothing -> return Nothing Just _ -> do withTimeout delay transferTask Just <$> atomically (drainTChunkedQueue stashedQueue) where withTimeout t action = void $ action `race` threadDelay t ----------------------------------------------------------------