{- | A concurrent FIFO queue which behaves like Base's Chan with the new added 'forward' primative for merging two channels. Unlike duplicate style merging, when two channels are forwarded to each other, writes to one channel will not be duplicated to both channels, rather it will be as if both channels contents had been merged and all their outstanding and future references now point to this new channel. See <http://github.com/mmirman/forward-chan> for futher specification. -} module Control.Concurrent.Chan.Forwardable ( Chan() , newChan , writeChan , readChan , forwardChan , getChanContents , writeList2Chan ) where import Control.Concurrent.Chan.Forwardable.Internals import qualified Control.Concurrent.Chan.Unagi as U import Data.IORef import Data.Typeable import Control.Concurrent (forkIO) import GHC.Base (join) import System.IO.Unsafe ( unsafeInterleaveIO ) -- | Create a new channel newChan :: IO (Chan a) newChan = do (ci,co) <- U.newChan mi <- newIORef ci to <- newIORef co return $ Chan mi to {- | Write a value to the channel. If the channel becomes forwarded at any time in the future, any thread waiting on a value from the forwarded channel may receive this value. -} writeChan (Chan mi _) v = do ci <- readIORef mi U.writeChan ci $ return v {- | Read a value from a channel. If a thread is waiting at a readChan and another thread forwards this channel to another (or visa versa) and the other channel contains elements, this read might now consume that element. -} readChan (Chan mi to) = join $ readIORef to >>= U.readChan {- | Forward takes two channels and makes them act as one. It obeys a few properties: * /Commutivity/: @'forwardChan' a b === 'forwardChan' b a@ * /Behavioral Transitivity/: @('forwardChan' a b >> 'forwardChan' b c) === ('forwardChan' a b >> 'forwardChan' a c)@ * /Equal Opportunity/: A write to either channel before or after the forward will be able to be consumed by a read on either of the channels, and there will be no unexpected starvation or race conditions. * /Early Bird Gets The Worm/: The first thread to read from either channel will, after a 'forward', always recieve the next available item. Similarly, items written to either channel are read in the same order they were written in. * /Note/: if @a '==' b@ is @False@, then after @'forwardChan' a b@ will not cause @a '==' b@ to become @True@. -} forwardChan :: Chan a -> Chan a -> IO () forwardChan c@(Chan mi to) (Chan mi' to') = do ci' <- readIORef mi atomicWriteIORef mi' =<< readIORef mi co' <- readIORef to' atomicWriteIORef to' =<< readIORef to let readIfAvailable av non = do (v,_) <- U.tryReadChan co' v <- U.tryRead v case v of Just _ -> av =<< join (U.readChan co') _ -> non getOldOrNew = readIfAvailable return $ readChan c useAll :: IO () useAll = readIfAvailable (\v -> writeChan c v >> useAll) $ return () useAll U.writeChan ci' $ getOldOrNew -- executes if we're stuck on a read. -- | Return a lazy list representing the contents of the supplied 'Chan' getChanContents :: Chan a -> IO [a] getChanContents ch = unsafeInterleaveIO $ do x <- readChan ch xs <- getChanContents ch return $ x:xs -- | Write an entire list of items to a 'Chan'. writeList2Chan :: Chan a -> [a] -> IO () writeList2Chan ch ls = sequence_ (map (writeChan ch) ls)