module Data.Conduit.Algorithms.Async
( conduitPossiblyCompressedFile
, asyncMapC
, asyncMapEitherC
, asyncGzipTo
, asyncGzipToFile
, asyncGzipFrom
, asyncGzipFromFile
) where
import qualified Data.ByteString as B
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM.TBQueue as TQ
import Control.Concurrent.STM (atomically)
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.Async as CA
import qualified Data.Conduit.TQueue as CA
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Zlib as CZ
#ifndef WINDOWS
import qualified Data.Conduit.BZlib as CZ
#endif
import qualified Data.Conduit as C
import Data.Conduit ((.|))
import qualified Data.Sequence as Seq
import Data.Sequence ((|>), ViewL(..))
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Error.Class (MonadError(..))
import Control.Monad.Trans.Resource (MonadResource, MonadBaseControl)
import Control.Exception (evaluate)
import Control.DeepSeq
import System.IO
import Data.List (isSuffixOf)
import Data.Conduit.Algorithms.Utils (awaitJust)
asyncMapC :: forall a m b . (MonadIO m, NFData b) => Int -> (a -> b) -> C.Conduit a m b
asyncMapC maxSize f = initLoop (0 :: Int) (Seq.empty :: Seq.Seq (A.Async b))
where
initLoop :: Int -> Seq.Seq (A.Async b) -> C.Conduit a m b
initLoop size q
| size == maxSize = loop q
| otherwise = C.await >>= \case
Nothing -> yAll q
Just v -> do
v' <- sched v
initLoop (size + 1) (q |> v')
sched :: a -> C.ConduitM a b m (A.Async b)
sched v = liftIO . A.async . evaluate . force $ f v
yAll :: Seq.Seq (A.Async b) -> C.Conduit a m b
yAll q = case Seq.viewl q of
EmptyL -> return ()
v :< rest -> (liftIO (A.wait v) >>= yieldOrCleanup rest) >> yAll rest
loop :: Seq.Seq (A.Async b) -> C.Conduit a m b
loop q = C.await >>= \case
Nothing -> yAll q
Just v -> do
v' <- sched v
case Seq.viewl q of
(r :< rest) -> do
yieldOrCleanup rest =<< liftIO (A.wait r)
loop (rest |> v')
_ -> error "should never happen"
cleanup :: Seq.Seq (A.Async b) -> m ()
cleanup q = liftIO $ forM_ q A.cancel
yieldOrCleanup q = flip C.yieldOr (cleanup q)
asyncMapEitherC :: forall a m b e . (MonadIO m, NFData b, NFData e, MonadError e m) => Int -> (a -> Either e b) -> C.Conduit a m b
asyncMapEitherC maxSize f = asyncMapC maxSize f .| (C.awaitForever $ \case
Right v -> C.yield v
Left err -> throwError err)
bsConcatTo :: MonadIO m => Int
-> C.Conduit B.ByteString m [B.ByteString]
bsConcatTo chunkSize = awaitJust start
where
start v
| B.length v >= chunkSize = C.yield [v] >> bsConcatTo chunkSize
| otherwise = continue [v] (B.length v)
continue chunks s = C.await >>= \case
Nothing -> C.yield chunks
Just v
| B.length v + s > chunkSize -> C.yield chunks >> start v
| otherwise -> continue (v:chunks) (s + B.length v)
untilNothing :: forall m i. (Monad m) => C.Conduit (Maybe i) m i
untilNothing = C.await >>= \case
Just (Just val) -> do
C.yield val
untilNothing
_ -> return ()
asyncGzipTo :: forall m. (MonadIO m, MonadBaseControl IO m) => Handle -> C.Sink B.ByteString m ()
asyncGzipTo h = do
let drain q = liftIO . C.runConduit $
CA.sourceTBQueue q
.| untilNothing
.| CL.map (B.concat . reverse)
.| CZ.gzip
.| C.sinkHandle h
bsConcatTo ((2 :: Int) ^ (15 :: Int))
.| CA.drainTo 8 drain
asyncGzipToFile :: forall m. (MonadResource m, MonadBaseControl IO m) => FilePath -> C.Sink B.ByteString m ()
asyncGzipToFile fname = C.bracketP
(openFile fname WriteMode)
hClose
asyncGzipTo
asyncGzipFrom :: forall m. (MonadIO m, MonadResource m, MonadBaseControl IO m) => Handle -> C.Source m B.ByteString
asyncGzipFrom h = do
let prod q = liftIO $ do
C.runConduit $
C.sourceHandle h
.| CZ.multiple CZ.ungzip
.| CL.map Just
.| CA.sinkTBQueue q
atomically (TQ.writeTBQueue q Nothing)
CA.gatherFrom 8 prod
.| untilNothing
asyncGzipFromFile :: forall m. (MonadResource m, MonadBaseControl IO m) => FilePath -> C.Source m B.ByteString
asyncGzipFromFile fname = C.bracketP
(openFile fname ReadMode)
hClose
asyncGzipFrom
conduitPossiblyCompressedFile :: (MonadBaseControl IO m, MonadResource m) => FilePath -> C.Source m B.ByteString
conduitPossiblyCompressedFile fname
| ".gz" `isSuffixOf` fname = asyncGzipFromFile fname
#ifndef WINDOWS
| ".bz2" `isSuffixOf` fname = C.sourceFile fname .| CZ.bunzip2
#else
| ".bz2" `isSuffixOf` fname = error "bzip2 decompression is not available on Windows"
#endif
| otherwise = C.sourceFile fname