module Control.Concurrent.STM.TMChunkedQueue (
TMChunkedQueue,
ChunkedQueue,
consumeQueue,
newTMChunkedQueue,
newTMChunkedQueueIO,
drainTMChunkedQueue,
tryDrainTMChunkedQueue,
writeTMChunkedQueue,
writeManyTMChunkedQueue,
closeTMChunkedQueue,
isEmptyTMChunkedQueue,
isClosedTMChunkedQueue,
drainAndSettleTMChunkedQueue,
drainWithTimeoutTMChunkedQueue,
) where
import Data.Typeable (Typeable)
import Prelude hiding (reads)
import Control.Applicative ((<$>), (<*>))
import Control.Monad
import Control.Monad.STM (STM, atomically)
import Control.Concurrent.STM.TVar
import Control.Concurrent.Async (race)
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM.ChunkedQueue
import Control.Concurrent.STM.TChunkedQueue
data TMChunkedQueue a = TMChunkedQueue {
_isClosed :: !(TVar Bool),
_queue :: !(TChunkedQueue a)
} deriving Typeable
newTMChunkedQueue :: STM (TMChunkedQueue a)
newTMChunkedQueue =
TMChunkedQueue <$>
newTVar False <*>
newTChunkedQueue
newTMChunkedQueueIO :: IO (TMChunkedQueue a)
newTMChunkedQueueIO =
TMChunkedQueue <$>
newTVarIO False <*>
newTChunkedQueueIO
nonEmptyList :: [a] -> Maybe [a]
nonEmptyList [] = Nothing
nonEmptyList xs = Just xs
nonEmptyChunkedQueue :: ChunkedQueue a -> Maybe (ChunkedQueue a)
nonEmptyChunkedQueue (ChunkedQueue xs) = ChunkedQueue <$> nonEmptyList xs
drainTMChunkedQueue :: TMChunkedQueue a -> STM (Maybe (ChunkedQueue a))
drainTMChunkedQueue (TMChunkedQueue closed queue) = do
isClosed <- readTVar closed
if isClosed
then nonEmptyChunkedQueue <$> tryDrainTChunkedQueue queue
else Just <$> drainTChunkedQueue queue
tryDrainTMChunkedQueue :: TMChunkedQueue a -> STM (Maybe (ChunkedQueue a))
tryDrainTMChunkedQueue (TMChunkedQueue closed queue) = do
isClosed <- readTVar closed
if isClosed
then nonEmptyChunkedQueue <$> tryDrainTChunkedQueue queue
else Just <$> tryDrainTChunkedQueue queue
doIfOpen :: TMChunkedQueue a -> (TChunkedQueue a -> STM ()) -> STM ()
doIfOpen (TMChunkedQueue closed queue) action = do
isClosed <- readTVar closed
unless isClosed $ action queue
writeManyTMChunkedQueue :: TMChunkedQueue a -> [a] -> STM ()
writeManyTMChunkedQueue queue xs =
void $ doIfOpen queue $ flip writeManyTChunkedQueue xs
writeTMChunkedQueue :: TMChunkedQueue a -> a -> STM ()
writeTMChunkedQueue queue x =
void $ doIfOpen queue $ flip writeTChunkedQueue x
isEmptyTMChunkedQueue :: TMChunkedQueue a -> STM Bool
isEmptyTMChunkedQueue (TMChunkedQueue _ queue) = isEmptyTChunkedQueue queue
closeTMChunkedQueue :: TMChunkedQueue a -> STM ()
closeTMChunkedQueue (TMChunkedQueue closed _queue) =
writeTVar closed True
isClosedTMChunkedQueue :: TMChunkedQueue a -> STM Bool
isClosedTMChunkedQueue (TMChunkedQueue closed _queue) =
readTVar closed
drainAndSettleTMChunkedQueue :: Int
-> TMChunkedQueue a
-> IO (Maybe (ChunkedQueue a))
drainAndSettleTMChunkedQueue delay queue = do
maybeChQueue <- atomically $ drainTMChunkedQueue queue
case maybeChQueue of
Nothing -> return Nothing
Just (ChunkedQueue chunks) -> Just <$> go chunks
where
go acc = do
threadDelay delay
let terminate = return $ ChunkedQueue acc
maybeChQueue <- atomically $ tryDrainTMChunkedQueue queue
case maybeChQueue of
Nothing -> terminate
Just (ChunkedQueue []) -> terminate
Just (ChunkedQueue chunks) -> go (chunks ++ acc)
drainWithTimeoutTMChunkedQueue :: Int
-> TMChunkedQueue a
-> IO (Maybe (ChunkedQueue a))
drainWithTimeoutTMChunkedQueue delay queue = do
stashedQueue <- newTChunkedQueueIO
let transferItems = atomically $ do
maybeItems <- drainTMChunkedQueue queue
case maybeItems of
Nothing -> return Nothing
Just items -> Just <$> stashedQueue `writeManyTChunkedQueue` consumeQueue items
let transferTask = do
result <- transferItems
case result of
Nothing -> return ()
Just _ -> transferTask
result <- transferItems
case result of
Nothing -> return Nothing
Just _ -> do
withTimeout delay transferTask
Just <$> atomically (drainTChunkedQueue stashedQueue)
where
withTimeout t action = void $ action `race` threadDelay t