module Data.Conduit.Async.Composition ( CConduit
, CFConduit
, ($=&)
, (=$&)
, (=$=&)
, ($$&)
, buffer
, buffer'
, bufferToFile
, bufferToFile'
, runCConduit
) where
import Conduit
import Control.Concurrent.STM (orElse, check)
import Control.Monad hiding (forM_)
import Control.Monad.Loops
import Control.Monad.Trans.Resource
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Cereal as C
import qualified Data.Conduit.List as CL
import Data.Foldable (forM_)
import Data.Serialize
import GHC.Exts (Constraint)
import System.Directory (removeFile)
import System.IO (openBinaryTempFile)
import UnliftIO
buffer :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m)
=> Int
-> c1 () x m ()
-> c2 x Void m r
-> m r
buffer i c1 c2 = runCConduit (buffer' i c1 c2)
($$&) :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m) => c1 () x m () -> c2 x Void m r -> m r
a $$& b = runCConduit (a =$=& b)
infixr 0 $$&
(=$=&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
a =$=& b = buffer' 64 a b
infixr 2 =$=&
($=&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
($=&) = (=$=&)
infixl 1 $=&
(=$&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
(=$&) = (=$=&)
infixr 2 =$&
class CCatable c1 c2 (c3 :: * -> * -> (* -> *) -> * -> *) | c1 c2 -> c3 where
buffer' :: Int
-> c1 i x m ()
-> c2 x o m r
-> c3 i o m r
bufferToFile :: (CFConduitLike c1, CFConduitLike c2, Serialize x, MonadUnliftIO m, MonadResource m, MonadThrow m)
=> Int
-> Maybe Int
-> FilePath
-> c1 () x m ()
-> c2 x Void m r
-> m r
bufferToFile bufsz dsksz tmpDir c1 c2 = runCConduit (bufferToFile' bufsz dsksz tmpDir c1 c2)
bufferToFile' :: (CFConduitLike c1, CFConduitLike c2, Serialize x)
=> Int
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' bufsz dsksz tmpDir c1 c2 = combine (asCFConduit c1) (asCFConduit c2)
where combine (FSingle a) b = FMultipleF bufsz dsksz tmpDir a b
combine (FMultiple i a as) b = FMultiple i a (bufferToFile' bufsz dsksz tmpDir as b)
combine (FMultipleF bufsz' dsksz' tmpDir' a as) b = FMultipleF bufsz' dsksz' tmpDir' a (bufferToFile' bufsz dsksz tmpDir as b)
class CRunnable c where
type RunConstraints c (m :: * -> *) :: Constraint
runCConduit :: (RunConstraints c m) => c () Void m r -> m r
instance CCatable ConduitT ConduitT CConduit where
buffer' i a b = buffer' i (Single a) (Single b)
instance CCatable ConduitT CConduit CConduit where
buffer' i a b = buffer' i (Single a) b
instance CCatable ConduitT CFConduit CFConduit where
buffer' i a b = buffer' i (asCFConduit a) b
instance CCatable CConduit ConduitT CConduit where
buffer' i a b = buffer' i a (Single b)
instance CCatable CConduit CConduit CConduit where
buffer' i (Single a) b = Multiple i a b
buffer' i (Multiple i' a as) b = Multiple i' a (buffer' i as b)
instance CCatable CConduit CFConduit CFConduit where
buffer' i a b = buffer' i (asCFConduit a) b
instance CCatable CFConduit ConduitT CFConduit where
buffer' i a b = buffer' i a (asCFConduit b)
instance CCatable CFConduit CConduit CFConduit where
buffer' i a b = buffer' i a (asCFConduit b)
instance CCatable CFConduit CFConduit CFConduit where
buffer' i (FSingle a) b = FMultiple i a b
buffer' i (FMultiple i' a as) b = FMultiple i' a (buffer' i as b)
buffer' i (FMultipleF bufsz dsksz tmpDir a as) b = FMultipleF bufsz dsksz tmpDir a (buffer' i as b)
instance CRunnable ConduitT where
type RunConstraints ConduitT m = (MonadUnliftIO m, Monad m)
runCConduit = runConduit
instance CRunnable CConduit where
type RunConstraints CConduit m = (MonadUnliftIO m, MonadIO m)
runCConduit (Single c) = runConduit c
runCConduit (Multiple bufsz c cs) = do
chan <- liftIO $ newTBQueueIO bufsz
withAsync (sender chan c) $ \c' ->
stage chan c' cs
instance CRunnable CFConduit where
type RunConstraints CFConduit m = (MonadThrow m, MonadUnliftIO m, MonadIO m, MonadResource m)
runCConduit (FSingle c) = runConduit c
runCConduit (FMultiple bufsz c cs) = do
chan <- liftIO $ newTBQueueIO bufsz
withAsync (sender chan c) $ \c' ->
fstage (receiver chan) c' cs
runCConduit (FMultipleF bufsz filemax tempDir c cs) = do
context <- liftIO $ BufferContext <$> newTBQueueIO bufsz
<*> newTQueueIO
<*> newTVarIO filemax
<*> newTVarIO False
<*> pure tempDir
withAsync (fsender context c) $ \c' ->
fstage (freceiver context) c' cs
data CConduit i o m r where
Single :: ConduitT i o m r -> CConduit i o m r
Multiple :: Int -> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
sender :: (MonadIO m) => TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender chan input = do
runConduit $ input .| mapM_C (send chan . Just)
send chan Nothing
stage :: (MonadUnliftIO m) => TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage chan prevAsync (Single c) =
withAsync (runConduit $ receiver chan .| c) $ \c' -> do
link2 prevAsync c'
wait c'
stage chan prevAsync (Multiple bufsz c cs) = do
chan' <- liftIO $ newTBQueueIO bufsz
withAsync (sender chan' $ receiver chan .| c) $ \c' -> do
link2 prevAsync c'
stage chan' c' cs
receiver :: (MonadIO m) => TBQueue (Maybe o) -> ConduitT () o m ()
receiver chan = do
mx <- recv chan
case mx of
Nothing -> return ()
Just x -> yield x >> receiver chan
data CFConduit i o m r where
FSingle :: ConduitT i o m r -> CFConduit i o m r
FMultiple :: Int -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultipleF :: (Serialize x) => Int -> Maybe Int -> FilePath -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
class CFConduitLike a where
asCFConduit :: a i o m r -> CFConduit i o m r
instance CFConduitLike ConduitT where
asCFConduit = FSingle
instance CFConduitLike CConduit where
asCFConduit (Single c) = FSingle c
asCFConduit (Multiple i c cs) = FMultiple i c (asCFConduit cs)
instance CFConduitLike CFConduit where
asCFConduit = id
data BufferContext m a = BufferContext { chan :: TBQueue a
, restore :: TQueue (ConduitT () a m ())
, slotsFree :: TVar (Maybe Int)
, done :: TVar Bool
, tempDir :: FilePath
}
fsender :: (MonadThrow m, MonadResource m, Serialize x) => BufferContext m x -> ConduitT () x m () -> m ()
fsender bc@BufferContext{..} input = do
runConduit $ input .| (mapM_C $ \x -> join $ liftIO $ atomically $ do
(writeTBQueue chan x >> return (return ())) `orElse` do
action <- persistChan bc
writeTBQueue chan x
return action)
liftIO $ atomically $ writeTVar done True
fstage :: (MonadThrow m, MonadUnliftIO m, MonadResource m) => ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage prevStage prevAsync (FSingle c) =
withAsync (runConduit $ prevStage .| c) $ \c' -> do
link2 prevAsync c'
wait c'
fstage prevStage prevAsync (FMultiple bufsz c cs) = do
chan' <- liftIO $ newTBQueueIO bufsz
withAsync (sender chan' $ prevStage .| c) $ \c' -> do
link2 prevAsync c'
fstage (receiver chan') c' cs
fstage prevStage prevAsync (FMultipleF bufsz dsksz tempDir c cs) = do
bc <- liftIO $ BufferContext <$> newTBQueueIO bufsz
<*> newTQueueIO
<*> newTVarIO dsksz
<*> newTVarIO False
<*> pure tempDir
withAsync (fsender bc $ prevStage .| c) $ \c' -> do
link2 prevAsync c'
fstage (freceiver bc) c' cs
freceiver :: (MonadIO m) => BufferContext m o -> ConduitT () o m ()
freceiver BufferContext{..} = loop where
loop = do
(src, exit) <- liftIO $ atomically $ do
(readTQueue restore >>= (\action -> return (action, False))) `orElse` do
xs <- exhaust chan
isDone <- readTVar done
return (CL.sourceList xs, isDone)
src
unless exit loop
persistChan :: (MonadThrow m, MonadResource m, Serialize o) => BufferContext m o -> STM (m ())
persistChan BufferContext{..} = do
xs <- exhaust chan
mslots <- readTVar slotsFree
let len = length xs
forM_ mslots $ \slots -> check (len < slots)
filePath <- newEmptyTMVar
writeTQueue restore $ do
(path, key) <- liftIO $ atomically $ takeTMVar filePath
CB.sourceFile path .| do
C.conduitGet2 get
liftIO $ atomically $ modifyTVar slotsFree (fmap (+ len))
release key
case xs of
[] -> return (return ())
_ -> do
modifyTVar slotsFree (fmap (subtract len))
return $ do
(key, (path, h)) <- allocate (openBinaryTempFile tempDir "conduit.bin") (\(path, h) -> hClose h `finally` removeFile path)
liftIO $ do
runConduit $ CL.sourceList xs .| C.conduitPut put .| CB.sinkHandle h
hClose h
atomically $ putTMVar filePath (path, key)
exhaust :: TBQueue a -> STM [a]
exhaust chan = whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
recv :: (MonadIO m) => TBQueue a -> m a
recv c = liftIO . atomically $ readTBQueue c
send :: (MonadIO m) => TBQueue a -> a -> m ()
send c = liftIO . atomically . writeTBQueue c