module Data.Conduit.Combinators
(
yieldMany
, unfold
, enumFromTo
, iterate
, repeat
, replicate
, sourceLazy
, repeatM
, repeatWhileM
, replicateM
, sourceFile
, sourceFileBS
, sourceHandle
, sourceHandleUnsafe
, sourceIOHandle
, stdin
, withSourceFile
, sourceDirectory
, sourceDirectoryDeep
, drop
, dropE
, dropWhile
, dropWhileE
, fold
, foldE
, foldl
, foldl1
, foldlE
, foldMap
, foldMapE
, all
, allE
, any
, anyE
, and
, andE
, or
, orE
, asum
, elem
, elemE
, notElem
, notElemE
, sinkLazy
, sinkList
, sinkVector
, sinkVectorN
, sinkLazyBuilder
, sinkNull
, awaitNonNull
, head
, headDef
, headE
, peek
, peekE
, last
, lastDef
, lastE
, length
, lengthE
, lengthIf
, lengthIfE
, maximum
, maximumE
, minimum
, minimumE
, null
, nullE
, sum
, sumE
, product
, productE
, find
, mapM_
, mapM_E
, foldM
, foldME
, foldMapM
, foldMapME
, sinkFile
, sinkFileCautious
, sinkTempFile
, sinkSystemTempFile
, sinkFileBS
, sinkHandle
, sinkIOHandle
, print
, stdout
, stderr
, withSinkFile
, withSinkFileBuilder
, withSinkFileCautious
, sinkHandleBuilder
, sinkHandleFlush
, map
, mapE
, omapE
, concatMap
, concatMapE
, take
, takeE
, takeWhile
, takeWhileE
, takeExactly
, takeExactlyE
, concat
, filter
, filterE
, mapWhile
, conduitVector
, scanl
, mapAccumWhile
, concatMapAccum
, intersperse
, slidingWindow
, chunksOfE
, chunksOfExactlyE
, mapM
, mapME
, omapME
, concatMapM
, filterM
, filterME
, iterM
, scanlM
, mapAccumWhileM
, concatMapAccumM
, encodeUtf8
, decodeUtf8
, decodeUtf8Lenient
, line
, lineAscii
, unlines
, unlinesAscii
, takeExactlyUntilE
, linesUnbounded
, linesUnboundedAscii
, splitOnUnboundedE
, builderToByteString
, unsafeBuilderToByteString
, builderToByteStringWith
, builderToByteStringFlush
, builderToByteStringWithFlush
, BufferAllocStrategy
, allNewBuffersStrategy
, reuseBufferStrategy
, vectorBuilder
, mapAccumS
, peekForever
, peekForeverE
) where
import Data.ByteString.Builder (Builder, toLazyByteString, hPutBuilder)
import qualified Data.ByteString.Builder.Internal as BB (flush)
import qualified Data.ByteString.Builder.Extra as BB (runBuilder, Next(Done, More, Chunk))
import qualified Data.NonNull as NonNull
import qualified Data.Traversable
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as BL
import Data.ByteString.Lazy.Internal (defaultChunkSize)
import Control.Applicative (Alternative(..), (<$>))
import Control.Exception (catch, throwIO, finally, bracket, try, evaluate)
import Control.Category (Category (..))
import Control.Monad (unless, when, (>=>), liftM, forever)
import Control.Monad.IO.Unlift (MonadIO (..), MonadUnliftIO, withRunInIO)
import Control.Monad.Primitive (PrimMonad, PrimState, unsafePrimToPrim)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource (MonadResource, MonadThrow, allocate, throwM)
import Data.Conduit
import Data.Conduit.Internal (ConduitT (..), Pipe (..))
import qualified Data.Conduit.List as CL
import Data.IORef
import Data.Maybe (fromMaybe, isNothing, isJust)
import Data.Monoid (Monoid (..))
import Data.MonoTraversable
import qualified Data.Sequences as Seq
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import Data.Void (absurd)
import Prelude (Bool (..), Eq (..), Int,
Maybe (..), Either (..), Monad (..), Num (..),
Ord (..), fromIntegral, maybe, either,
($), Functor (..), Enum, seq, Show, Char,
otherwise, Either (..), not,
($!), succ, FilePath, IO, String)
import Data.Word (Word8)
import qualified Prelude
import qualified System.IO as IO
import System.IO.Error (isDoesNotExistError)
import System.IO.Unsafe (unsafePerformIO)
import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TEE
import Data.Conduit.Combinators.Stream
import Data.Conduit.Internal.Fusion
import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar,
writeMutVar)
import qualified Data.Streaming.FileRead as FR
import qualified Data.Streaming.Filesystem as F
import GHC.ForeignPtr (mallocPlainForeignPtrBytes, unsafeForeignPtrToPtr)
import Foreign.ForeignPtr (touchForeignPtr, ForeignPtr)
import Foreign.Ptr (Ptr, plusPtr, minusPtr)
import Data.ByteString.Internal (ByteString (PS), mallocByteString)
import System.FilePath ((</>), (<.>), takeDirectory, takeFileName)
import System.Directory (renameFile, getTemporaryDirectory, removeFile)
import qualified Data.Sequences as DTE
import Data.Sequences (LazySequence (..))
#include "fusion-macros.h"
yieldMany, yieldManyC :: (Monad m, MonoFoldable mono)
=> mono
-> ConduitT i (Element mono) m ()
yieldManyC = ofoldMap yield
STREAMING(yieldMany, yieldManyC, yieldManyS, x)
unfold :: Monad m
=> (b -> Maybe (a, b))
-> b
-> ConduitT i a m ()
INLINE_RULE(unfold, f x, CL.unfold f x)
enumFromTo :: (Monad m, Enum a, Ord a) => a -> a -> ConduitT i a m ()
INLINE_RULE(enumFromTo, f t, CL.enumFromTo f t)
iterate :: Monad m => (a -> a) -> a -> ConduitT i a m ()
INLINE_RULE(iterate, f t, CL.iterate f t)
repeat :: Monad m => a -> ConduitT i a m ()
INLINE_RULE(repeat, x, iterate id x)
replicate :: Monad m
=> Int
-> a
-> ConduitT i a m ()
INLINE_RULE(replicate, n x, CL.replicate n x)
sourceLazy :: (Monad m, LazySequence lazy strict)
=> lazy
-> ConduitT i strict m ()
INLINE_RULE(sourceLazy, x, yieldMany (toChunks x))
repeatM, repeatMC :: Monad m
=> m a
-> ConduitT i a m ()
repeatMC m = forever $ lift m >>= yield
STREAMING(repeatM, repeatMC, repeatMS, m)
repeatWhileM, repeatWhileMC :: Monad m
=> m a
-> (a -> Bool)
-> ConduitT i a m ()
repeatWhileMC m f =
loop
where
loop = do
x <- lift m
when (f x) $ yield x >> loop
STREAMING(repeatWhileM, repeatWhileMC, repeatWhileMS, m f)
replicateM :: Monad m
=> Int
-> m a
-> ConduitT i a m ()
INLINE_RULE(replicateM, n m, CL.replicateM n m)
sourceFile :: MonadResource m
=> FilePath
-> ConduitT i S.ByteString m ()
sourceFile fp =
bracketP
(FR.openFile fp)
FR.closeFile
loop
where
loop h = do
bs <- liftIO $ FR.readChunk h
unless (S.null bs) $ do
yield bs
loop h
sourceHandle :: MonadIO m
=> IO.Handle
-> ConduitT i S.ByteString m ()
sourceHandle h =
loop
where
loop = do
bs <- liftIO (S.hGetSome h defaultChunkSize)
if S.null bs
then return ()
else yield bs >> loop
sourceHandleUnsafe :: MonadIO m => IO.Handle -> ConduitT i ByteString m ()
sourceHandleUnsafe handle = do
fptr <- liftIO $ mallocPlainForeignPtrBytes defaultChunkSize
let ptr = unsafeForeignPtrToPtr fptr
loop = do
count <- liftIO $ IO.hGetBuf handle ptr defaultChunkSize
when (count > 0) $ do
yield (PS fptr 0 count)
loop
loop
liftIO $ touchForeignPtr fptr
sourceIOHandle :: MonadResource m
=> IO IO.Handle
-> ConduitT i S.ByteString m ()
sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
sourceFileBS :: MonadResource m => FilePath -> ConduitT i ByteString m ()
sourceFileBS = sourceFile
stdin :: MonadIO m => ConduitT i ByteString m ()
INLINE_RULE0(stdin, sourceHandle IO.stdin)
sinkFile :: MonadResource m
=> FilePath
-> ConduitT S.ByteString o m ()
sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
sinkFileCautious
:: MonadResource m
=> FilePath
-> ConduitM S.ByteString o m ()
sinkFileCautious fp =
bracketP (cautiousAcquire fp) cautiousCleanup inner
where
inner (tmpFP, h) = do
sinkHandle h
liftIO $ do
IO.hClose h
renameFile tmpFP fp
withSinkFileCautious
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM S.ByteString o n () -> m a)
-> m a
withSinkFileCautious fp inner =
withRunInIO $ \run -> bracket
(cautiousAcquire fp)
cautiousCleanup
(\(tmpFP, h) -> do
a <- run $ inner $ sinkHandle h
IO.hClose h
renameFile tmpFP fp
return a)
cautiousAcquire :: FilePath -> IO (FilePath, IO.Handle)
cautiousAcquire fp = IO.openBinaryTempFile (takeDirectory fp) (takeFileName fp <.> "tmp")
cautiousCleanup :: (FilePath, IO.Handle) -> IO ()
cautiousCleanup (tmpFP, h) = do
IO.hClose h
removeFile tmpFP `Control.Exception.catch` \e ->
if isDoesNotExistError e
then return ()
else throwIO e
sinkTempFile :: MonadResource m
=> FilePath
-> String
-> ConduitM ByteString o m FilePath
sinkTempFile tmpdir pattern = do
(_releaseKey, (fp, h)) <- allocate
(IO.openBinaryTempFile tmpdir pattern)
(\(fp, h) -> IO.hClose h `finally` (removeFile fp `Control.Exception.catch` \e ->
if isDoesNotExistError e
then return ()
else throwIO e))
sinkHandle h
liftIO $ IO.hClose h
return fp
sinkSystemTempFile
:: MonadResource m
=> String
-> ConduitM ByteString o m FilePath
sinkSystemTempFile pattern = do
dir <- liftIO getTemporaryDirectory
sinkTempFile dir pattern
sinkHandle :: MonadIO m
=> IO.Handle
-> ConduitT S.ByteString o m ()
sinkHandle h = awaitForever (liftIO . S.hPut h)
sinkHandleBuilder :: MonadIO m => IO.Handle -> ConduitM Builder o m ()
sinkHandleBuilder h = awaitForever (liftIO . hPutBuilder h)
sinkHandleFlush :: MonadIO m
=> IO.Handle
-> ConduitM (Flush S.ByteString) o m ()
sinkHandleFlush h =
awaitForever $ \mbs -> liftIO $
case mbs of
Chunk bs -> S.hPut h bs
Flush -> IO.hFlush h
sinkIOHandle :: MonadResource m
=> IO IO.Handle
-> ConduitT S.ByteString o m ()
sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle
withSourceFile
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM i ByteString n () -> m a)
-> m a
withSourceFile fp inner =
withRunInIO $ \run ->
IO.withBinaryFile fp IO.ReadMode $
run . inner . sourceHandle
withSinkFile
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM ByteString o n () -> m a)
-> m a
withSinkFile fp inner =
withRunInIO $ \run ->
IO.withBinaryFile fp IO.ReadMode $
run . inner . sinkHandle
withSinkFileBuilder
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM Builder o n () -> m a)
-> m a
withSinkFileBuilder fp inner =
withRunInIO $ \run ->
IO.withBinaryFile fp IO.WriteMode $ \h ->
run $ inner $ CL.mapM_ (liftIO . hPutBuilder h)
sourceDirectory :: MonadResource m => FilePath -> ConduitT i FilePath m ()
sourceDirectory dir =
bracketP (F.openDirStream dir) F.closeDirStream go
where
go ds =
loop
where
loop = do
mfp <- liftIO $ F.readDirStream ds
case mfp of
Nothing -> return ()
Just fp -> do
yield $ dir </> fp
loop
sourceDirectoryDeep :: MonadResource m
=> Bool
-> FilePath
-> ConduitT i FilePath m ()
sourceDirectoryDeep followSymlinks =
start
where
start :: MonadResource m => FilePath -> ConduitT i FilePath m ()
start dir = sourceDirectory dir .| awaitForever go
go :: MonadResource m => FilePath -> ConduitT i FilePath m ()
go fp = do
ft <- liftIO $ F.getFileType fp
case ft of
F.FTFile -> yield fp
F.FTFileSym -> yield fp
F.FTDirectory -> start fp
F.FTDirectorySym
| followSymlinks -> start fp
| otherwise -> return ()
F.FTOther -> return ()
drop :: Monad m
=> Int
-> ConduitT a o m ()
INLINE_RULE(drop, n, CL.drop n)
dropE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> ConduitT seq o m ()
dropE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i sq = do
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i sq
i' = i fromIntegral (olength x)
dropWhile :: Monad m
=> (a -> Bool)
-> ConduitT a o m ()
dropWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x then loop else leftover x
dropWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> ConduitT seq o m ()
dropWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go sq =
if onull x then loop else leftover x
where
x = Seq.dropWhile f sq
fold :: (Monad m, Monoid a)
=> ConduitT a o m a
INLINE_RULE0(fold, CL.foldMap id)
foldE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
=> ConduitT mono o m (Element mono)
INLINE_RULE0(foldE, CL.fold (\accum mono -> accum `mappend` ofoldMap id mono) mempty)
foldl :: Monad m => (a -> b -> a) -> a -> ConduitT b o m a
INLINE_RULE(foldl, f x, CL.fold f x)
foldlE :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> a)
-> a
-> ConduitT mono o m a
INLINE_RULE(foldlE, f x, CL.fold (ofoldlPrime f) x)
ofoldlPrime :: MonoFoldable mono => (a -> Element mono -> a) -> a -> mono -> a
ofoldlPrime = ofoldl'
foldMap :: (Monad m, Monoid b)
=> (a -> b)
-> ConduitT a o m b
INLINE_RULE(foldMap, f, CL.foldMap f)
foldMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> ConduitT mono o m w
INLINE_RULE(foldMapE, f, CL.foldMap (ofoldMap f))
foldl1, foldl1C :: Monad m => (a -> a -> a) -> ConduitT a o m (Maybe a)
foldl1C f =
await >>= maybe (return Nothing) loop
where
loop !prev = await >>= maybe (return $ Just prev) (loop . f prev)
STREAMING(foldl1, foldl1C, foldl1S, f)
foldl1E :: (Monad m, MonoFoldable mono, a ~ Element mono)
=> (a -> a -> a)
-> ConduitT mono o m (Maybe a)
INLINE_RULE(foldl1E, f, foldl (foldMaybeNull f) Nothing)
foldMaybeNull :: (MonoFoldable mono, e ~ Element mono)
=> (e -> e -> e)
-> Maybe e
-> mono
-> Maybe e
foldMaybeNull f macc mono =
case (macc, NonNull.fromNullable mono) of
(Just acc, Just nn) -> Just $ ofoldl' f acc nn
(Nothing, Just nn) -> Just $ NonNull.ofoldl1' f nn
_ -> macc
all, allC :: Monad m
=> (a -> Bool)
-> ConduitT a o m Bool
allC f = fmap isNothing $ find (Prelude.not . f)
STREAMING(all, allC, allS, f)
allE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> ConduitT mono o m Bool
INLINE_RULE(allE, f, all (oall f))
any, anyC :: Monad m
=> (a -> Bool)
-> ConduitT a o m Bool
anyC = fmap isJust . find
STREAMING(any, anyC, anyS, f)
anyE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> ConduitT mono o m Bool
INLINE_RULE(anyE, f, any (oany f))
and :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(and, all id)
andE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> ConduitT mono o m Bool
INLINE_RULE0(andE, allE id)
or :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(or, any id)
orE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> ConduitT mono o m Bool
INLINE_RULE0(orE, anyE id)
asum :: (Monad m, Alternative f)
=> ConduitT (f a) o m (f a)
INLINE_RULE0(asum, foldl (<|>) empty)
elem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(elem, x, any (== x))
elemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
=> Element seq
-> ConduitT seq o m Bool
INLINE_RULE(elemE, f, any (oelem f))
notElem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(notElem, x, all (/= x))
notElemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
=> Element seq
-> ConduitT seq o m Bool
INLINE_RULE(notElemE, x, all (onotElem x))
sinkLazy, sinkLazyC :: (Monad m, LazySequence lazy strict)
=> ConduitT strict o m lazy
sinkLazyC = (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id
STREAMING0(sinkLazy, sinkLazyC, sinkLazyS)
sinkList :: Monad m => ConduitT a o m [a]
INLINE_RULE0(sinkList, CL.consume)
sinkVector, sinkVectorC :: (V.Vector v a, PrimMonad m)
=> ConduitT a o m (v a)
sinkVectorC = do
let initSize = 10
mv0 <- VM.new initSize
let go maxSize i mv | i >= maxSize = do
let newMax = maxSize * 2
mv' <- VM.grow mv maxSize
go newMax i mv'
go maxSize i mv = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> V.unsafeFreeze mv
Just x -> do
VM.write mv i x
go maxSize (i + 1) mv
go initSize 0 mv0
STREAMING0(sinkVector, sinkVectorC, sinkVectorS)
sinkVectorN, sinkVectorNC :: (V.Vector v a, PrimMonad m)
=> Int
-> ConduitT a o m (v a)
sinkVectorNC maxSize = do
mv <- VM.new maxSize
let go i | i >= maxSize = V.unsafeFreeze mv
go i = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> V.unsafeFreeze mv
Just x -> do
VM.write mv i x
go (i + 1)
go 0
STREAMING(sinkVectorN, sinkVectorNC, sinkVectorNS, maxSize)
sinkLazyBuilder, sinkLazyBuilderC :: Monad m => ConduitT Builder o m BL.ByteString
sinkLazyBuilderC = fmap toLazyByteString fold
STREAMING0(sinkLazyBuilder, sinkLazyBuilderC, sinkLazyBuilderS)
sinkNull :: Monad m => ConduitT a o m ()
INLINE_RULE0(sinkNull, CL.sinkNull)
awaitNonNull :: (Monad m, MonoFoldable a) => ConduitT a o m (Maybe (NonNull.NonNull a))
awaitNonNull =
go
where
go = await >>= maybe (return Nothing) go'
go' = maybe go (return . Just) . NonNull.fromNullable
head :: Monad m => ConduitT a o m (Maybe a)
head = CL.head
headDef :: Monad m => a -> ConduitT a o m a
headDef a = fromMaybe a <$> head
headE :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
headE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case Seq.uncons x of
Nothing -> loop
Just (y, z) -> do
unless (onull z) $ leftover z
return $ Just y
peek :: Monad m => ConduitT a o m (Maybe a)
peek = CL.peek
peekE :: (Monad m, MonoFoldable mono) => ConduitT mono o m (Maybe (Element mono))
peekE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case headMay x of
Nothing -> loop
Just y -> do
leftover x
return $ Just y
last, lastC :: Monad m => ConduitT a o m (Maybe a)
lastC =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) loop
STREAMING0(last, lastC, lastS)
lastDef :: Monad m => a -> ConduitT a o m a
lastDef a = fromMaybe a <$> last
lastE, lastEC :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
lastEC =
awaitNonNull >>= maybe (return Nothing) (loop . NonNull.last)
where
loop prev = awaitNonNull >>= maybe (return $ Just prev) (loop . NonNull.last)
STREAMING0(lastE, lastEC, lastES)
length :: (Monad m, Num len) => ConduitT a o m len
INLINE_RULE0(length, foldl (\x _ -> x + 1) 0)
lengthE :: (Monad m, Num len, MonoFoldable mono) => ConduitT mono o m len
INLINE_RULE0(lengthE, foldl (\x y -> x + fromIntegral (olength y)) 0)
lengthIf :: (Monad m, Num len) => (a -> Bool) -> ConduitT a o m len
INLINE_RULE(lengthIf, f, foldl (\cnt a -> if f a then (cnt + 1) else cnt) 0)
lengthIfE :: (Monad m, Num len, MonoFoldable mono)
=> (Element mono -> Bool) -> ConduitT mono o m len
INLINE_RULE(lengthIfE, f, foldlE (\cnt a -> if f a then (cnt + 1) else cnt) 0)
maximum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(maximum, foldl1 max)
maximumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(maximumE, foldl1E max)
minimum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(minimum, foldl1 min)
minimumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(minimumE, foldl1E min)
null :: Monad m => ConduitT a o m Bool
null = (maybe True (\_ -> False)) `fmap` peek
nullE :: (Monad m, MonoFoldable mono)
=> ConduitT mono o m Bool
nullE =
go
where
go = await >>= maybe (return True) go'
go' x = if onull x then go else leftover x >> return False
sum :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(sum, foldl (+) 0)
sumE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(sumE, foldlE (+) 0)
product :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(product, foldl (*) 1)
productE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(productE, foldlE (*) 1)
find, findC :: Monad m => (a -> Bool) -> ConduitT a o m (Maybe a)
findC f =
loop
where
loop = await >>= maybe (return Nothing) go
go x = if f x then return (Just x) else loop
STREAMING(find, findC, findS, f)
mapM_ :: Monad m => (a -> m ()) -> ConduitT a o m ()
INLINE_RULE(mapM_, f, CL.mapM_ f)
mapM_E :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> ConduitT mono o m ()
INLINE_RULE(mapM_E, f, CL.mapM_ (omapM_ f))
foldM :: Monad m => (a -> b -> m a) -> a -> ConduitT b o m a
INLINE_RULE(foldM, f x, CL.foldM f x)
foldME :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> m a)
-> a
-> ConduitT mono o m a
INLINE_RULE(foldME, f x, foldM (ofoldlM f) x)
foldMapM :: (Monad m, Monoid w) => (a -> m w) -> ConduitT a o m w
INLINE_RULE(foldMapM, f, CL.foldMapM f)
foldMapME :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> m w)
-> ConduitT mono o m w
INLINE_RULE(foldMapME, f, CL.foldM (ofoldlM (\accum e -> mappend accum `liftM` f e)) mempty)
sinkFileBS :: MonadResource m => FilePath -> ConduitT ByteString o m ()
sinkFileBS = sinkFile
print :: (Show a, MonadIO m) => ConduitT a o m ()
INLINE_RULE0(print, mapM_ (liftIO . Prelude.print))
stdout :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stdout, sinkHandle IO.stdout)
stderr :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stderr, sinkHandle IO.stderr)
map :: Monad m => (a -> b) -> ConduitT a b m ()
INLINE_RULE(map, f, CL.map f)
mapE :: (Monad m, Functor f) => (a -> b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapE, f, CL.map (fmap f))
omapE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> ConduitT mono mono m ()
INLINE_RULE(omapE, f, CL.map (omap f))
concatMap, concatMapC :: (Monad m, MonoFoldable mono)
=> (a -> mono)
-> ConduitT a (Element mono) m ()
concatMapC f = awaitForever (yieldMany . f)
STREAMING(concatMap, concatMapC, concatMapS, f)
concatMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> ConduitT mono w m ()
INLINE_RULE(concatMapE, f, CL.map (ofoldMap f))
take :: Monad m => Int -> ConduitT a a m ()
INLINE_RULE(take, n, CL.isolate n)
takeE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> ConduitT seq seq m ()
takeE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i sq = do
unless (onull x) $ yield x
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i sq
i' = i fromIntegral (olength x)
takeWhile :: Monad m
=> (a -> Bool)
-> ConduitT a a m ()
takeWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x
then yield x >> loop
else leftover x
takeWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> ConduitT seq seq m ()
takeWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go sq = do
unless (onull x) $ yield x
if onull y
then loop
else leftover y
where
(x, y) = Seq.span f sq
takeExactly :: Monad m
=> Int
-> ConduitT a b m r
-> ConduitT a b m r
takeExactly count inner = take count .| do
r <- inner
CL.sinkNull
return r
takeExactlyE :: (Monad m, Seq.IsSequence a)
=> Seq.Index a
-> ConduitT a b m r
-> ConduitT a b m r
takeExactlyE count inner = takeE count .| do
r <- inner
CL.sinkNull
return r
concat, concatC :: (Monad m, MonoFoldable mono)
=> ConduitT mono (Element mono) m ()
concatC = awaitForever yieldMany
STREAMING0(concat, concatC, concatS)
filter :: Monad m => (a -> Bool) -> ConduitT a a m ()
INLINE_RULE(filter, f, CL.filter f)
filterE :: (Seq.IsSequence seq, Monad m) => (Element seq -> Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterE, f, CL.map (Seq.filter f))
mapWhile :: Monad m => (a -> Maybe b) -> ConduitT a b m ()
mapWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x =
case f x of
Just y -> yield y >> loop
Nothing -> leftover x
conduitVector :: (V.Vector v a, PrimMonad m)
=> Int
-> ConduitT a (v a) m ()
conduitVector size =
loop
where
loop = do
v <- sinkVectorN size
unless (V.null v) $ do
yield v
loop
scanl, scanlC :: Monad m => (a -> b -> a) -> a -> ConduitT b a m ()
scanlC f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
let seed' = f seed b
seed' `seq` yield seed
loop seed'
STREAMING(scanl, scanlC, scanlS, f x)
mapAccumWhile, mapAccumWhileC :: Monad m => (a -> s -> Either s (s, b)) -> s -> ConduitT a b m s
mapAccumWhileC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = either (return $!) (\(s', b) -> yield b >> loop s') $ f a s
STREAMING(mapAccumWhile, mapAccumWhileC, mapAccumWhileS, f s)
concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE0(concatMapAccum, CL.concatMapAccum)
intersperse, intersperseC :: Monad m => a -> ConduitT a a m ()
intersperseC x =
await >>= omapM_ go
where
go y = yield y >> concatMap (\z -> [x, z])
STREAMING(intersperse, intersperseC, intersperseS, x)
slidingWindow, slidingWindowC :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> ConduitT a seq m ()
slidingWindowC sz = go (max 1 sz) mempty
where goContinue st = await >>=
maybe (return ())
(\x -> do
let st' = Seq.snoc st x
yield st' >> goContinue (Seq.unsafeTail st')
)
go 0 st = yield st >> goContinue (Seq.unsafeTail st)
go !n st = CL.head >>= \m ->
case m of
Nothing -> yield st
Just x -> go (n1) (Seq.snoc st x)
STREAMING(slidingWindow, slidingWindowC, slidingWindowS, sz)
chunksOfE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfE chunkSize = chunksOfExactlyE chunkSize >> (await >>= maybe (return ()) yield)
chunksOfExactlyE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfExactlyE chunkSize = await >>= maybe (return ()) start
where
start b
| onull b = chunksOfE chunkSize
| Seq.lengthIndex b < chunkSize = continue (Seq.lengthIndex b) [b]
| otherwise = let (first,rest) = Seq.splitAt chunkSize b in
yield first >> start rest
continue !sofar bs = do
next <- await
case next of
Nothing -> leftover (mconcat $ Prelude.reverse bs)
Just next' ->
let !sofar' = Seq.lengthIndex next' + sofar
bs' = next':bs
in if sofar' < chunkSize
then continue sofar' bs'
else start (mconcat (Prelude.reverse bs'))
mapM :: Monad m => (a -> m b) -> ConduitT a b m ()
INLINE_RULE(mapM, f, CL.mapM f)
mapME :: (Monad m, Data.Traversable.Traversable f) => (a -> m b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapME, f, CL.mapM (Data.Traversable.mapM f))
omapME :: (Monad m, MonoTraversable mono)
=> (Element mono -> m (Element mono))
-> ConduitT mono mono m ()
INLINE_RULE(omapME, f, CL.mapM (omapM f))
concatMapM, concatMapMC :: (Monad m, MonoFoldable mono)
=> (a -> m mono)
-> ConduitT a (Element mono) m ()
concatMapMC f = awaitForever (lift . f >=> yieldMany)
STREAMING(concatMapM, concatMapMC, concatMapMS, f)
filterM, filterMC :: Monad m
=> (a -> m Bool)
-> ConduitT a a m ()
filterMC f =
awaitForever go
where
go x = do
b <- lift $ f x
when b $ yield x
STREAMING(filterM, filterMC, filterMS, f)
filterME :: (Monad m, Seq.IsSequence seq) => (Element seq -> m Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterME, f, CL.mapM (Seq.filterM f))
iterM :: Monad m => (a -> m ()) -> ConduitT a a m ()
INLINE_RULE(iterM, f, CL.iterM f)
scanlM, scanlMC :: Monad m => (a -> b -> m a) -> a -> ConduitT b a m ()
scanlMC f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
seed' <- lift $ f seed b
seed' `seq` yield seed
loop seed'
STREAMING(scanlM, scanlMC, scanlMS, f x)
mapAccumWhileM, mapAccumWhileMC :: Monad m => (a -> s -> m (Either s (s, b))) -> s -> ConduitT a b m s
mapAccumWhileMC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = lift (f a s) >>= either (return $!) (\(s', b) -> yield b >> loop s')
STREAMING(mapAccumWhileM, mapAccumWhileMC, mapAccumWhileMS, f s)
concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE(concatMapAccumM, f x, CL.concatMapAccumM f x)
encodeUtf8 :: (Monad m, DTE.Utf8 text binary) => ConduitT text binary m ()
INLINE_RULE0(encodeUtf8, map DTE.encodeUtf8)
decodeUtf8 :: MonadThrow m => ConduitT ByteString Text m ()
decodeUtf8 =
loop TE.streamDecodeUtf8
where
loop parse =
await >>= maybe done go
where
parse' = unsafePerformIO . try . evaluate . parse
done =
case parse' mempty of
Left e -> throwM (e :: TEE.UnicodeException)
Right (TE.Some t bs _) -> do
unless (T.null t) (yield t)
unless (S.null bs) (yield $ T.replicate (S.length bs) (T.singleton '\xFFFD'))
go bs = do
case parse' bs of
Left e -> do
leftover bs
throwM (e :: TEE.UnicodeException)
Right (TE.Some t _ next) -> do
unless (T.null t) (yield t)
loop next
decodeUtf8Lenient :: Monad m => ConduitT ByteString Text m ()
decodeUtf8Lenient =
loop (TE.streamDecodeUtf8With TEE.lenientDecode)
where
loop parse =
await >>= maybe done go
where
done = do
let TE.Some t bs _ = parse mempty
unless (T.null t) (yield t)
unless (S.null bs) (yield $ T.replicate (S.length bs) (T.singleton '\xFFFD'))
go bs = do
let TE.Some t _ next = parse bs
unless (T.null t) (yield t)
loop next
line :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> ConduitT seq o m r
-> ConduitT seq o m r
line = takeExactlyUntilE (== '\n')
lineAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> ConduitT seq o m r
-> ConduitT seq o m r
lineAscii = takeExactlyUntilE (== 10)
takeExactlyUntilE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> ConduitT seq o m r
-> ConduitT seq o m r
takeExactlyUntilE f inner =
loop .| do
x <- inner
sinkNull
return x
where
loop = await >>= omapM_ go
go t =
if onull y
then yield x >> loop
else do
unless (onull x) $ yield x
let y' = Seq.drop 1 y
unless (onull y') $ leftover y'
where
(x, y) = Seq.break f t
unlines :: (Monad m, Seq.IsSequence seq, Element seq ~ Char) => ConduitT seq seq m ()
INLINE_RULE0(unlines, concatMap (:[Seq.singleton '\n']))
unlinesAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8) => ConduitT seq seq m ()
INLINE_RULE0(unlinesAscii, concatMap (:[Seq.singleton 10]))
splitOnUnboundedE, splitOnUnboundedEC :: (Monad m, Seq.IsSequence seq) => (Element seq -> Bool) -> ConduitT seq seq m ()
splitOnUnboundedEC f =
start
where
start = await >>= maybe (return ()) (loop id)
loop bldr t =
if onull y
then do
mt <- await
case mt of
Nothing -> let finalChunk = mconcat $ bldr [t]
in unless (onull finalChunk) $ yield finalChunk
Just t' -> loop (bldr . (t:)) t'
else yield (mconcat $ bldr [x]) >> loop id (Seq.drop 1 y)
where
(x, y) = Seq.break f t
STREAMING(splitOnUnboundedE, splitOnUnboundedEC, splitOnUnboundedES, f)
linesUnbounded :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> ConduitT seq seq m ()
INLINE_RULE0(linesUnbounded, splitOnUnboundedE (== '\n'))
linesUnboundedAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> ConduitT seq seq m ()
INLINE_RULE0(linesUnboundedAscii, splitOnUnboundedE (== 10))
builderToByteString :: PrimMonad m => ConduitT Builder S.ByteString m ()
builderToByteString = builderToByteStringWith defaultStrategy
builderToByteStringFlush :: PrimMonad m
=> ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringFlush = builderToByteStringWithFlush defaultStrategy
unsafeBuilderToByteString :: PrimMonad m
=> ConduitT Builder S.ByteString m ()
unsafeBuilderToByteString =
builderToByteStringWith (reuseBufferStrategy (allocBuffer defaultChunkSize))
builderToByteStringWith :: PrimMonad m
=> BufferAllocStrategy
-> ConduitT Builder S.ByteString m ()
builderToByteStringWith =
bbhelper (liftM (fmap Chunk) await) yield'
where
yield' Flush = return ()
yield' (Chunk bs) = yield bs
builderToByteStringWithFlush
:: PrimMonad m
=> BufferAllocStrategy
-> ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringWithFlush = bbhelper await yield
bbhelper
:: PrimMonad m
=> m (Maybe (Flush Builder))
-> (Flush S.ByteString -> m ())
-> BufferAllocStrategy
-> m ()
bbhelper await' yield' strat = do
(recv, finish) <- unsafePrimToPrim $ newByteStringBuilderRecv strat
let loop = await' >>= maybe finish' cont
finish' = do
mbs <- unsafePrimToPrim finish
maybe (return ()) (yield' . Chunk) mbs
cont fbuilder = do
let builder =
case fbuilder of
Flush -> BB.flush
Chunk b -> b
popper <- unsafePrimToPrim $ recv builder
let cont' = do
bs <- unsafePrimToPrim popper
unless (S.null bs) $ do
yield' (Chunk bs)
cont'
cont'
case fbuilder of
Flush -> yield' Flush
Chunk _ -> return ()
loop
loop
type BuilderPopper = IO S.ByteString
type BuilderRecv = Builder -> IO BuilderPopper
type BuilderFinish = IO (Maybe S.ByteString)
newByteStringBuilderRecv :: BufferAllocStrategy -> IO (BuilderRecv, BuilderFinish)
newByteStringBuilderRecv (ioBufInit, nextBuf) = do
refBuf <- newIORef ioBufInit
return (push refBuf, finish refBuf)
where
finish refBuf = do
ioBuf <- readIORef refBuf
buf <- ioBuf
return $ unsafeFreezeNonEmptyBuffer buf
push refBuf builder = do
refWri <- newIORef $ Left $ BB.runBuilder builder
return $ popper refBuf refWri
popper refBuf refWri = do
ioBuf <- readIORef refBuf
ebWri <- readIORef refWri
case ebWri of
Left bWri -> do
!buf@(Buffer _ _ op ope) <- ioBuf
(bytes, next) <- bWri op (ope `minusPtr` op)
let op' = op `plusPtr` bytes
case next of
BB.Done -> do
writeIORef refBuf $ return $ updateEndOfSlice buf op'
return S.empty
BB.More minSize bWri' -> do
let buf' = updateEndOfSlice buf op'
cont mbs = do
ioBuf' <- nextBuf minSize buf'
writeIORef refBuf ioBuf'
writeIORef refWri $ Left bWri'
case mbs of
Just bs | not $ S.null bs -> return bs
_ -> popper refBuf refWri
cont $ unsafeFreezeNonEmptyBuffer buf'
BB.Chunk bs bWri' -> do
let buf' = updateEndOfSlice buf op'
let yieldBS = do
nextBuf 1 buf' >>= writeIORef refBuf
writeIORef refWri $ Left bWri'
if S.null bs
then popper refBuf refWri
else return bs
case unsafeFreezeNonEmptyBuffer buf' of
Nothing -> yieldBS
Just bs' -> do
writeIORef refWri $ Right yieldBS
return bs'
Right action -> action
data Buffer = Buffer !(ForeignPtr Word8)
!(Ptr Word8)
!(Ptr Word8)
!(Ptr Word8)
unsafeFreezeBuffer :: Buffer -> S.ByteString
unsafeFreezeBuffer (Buffer fpbuf p0 op _) =
PS fpbuf (p0 `minusPtr` unsafeForeignPtrToPtr fpbuf) (op `minusPtr` p0)
unsafeFreezeNonEmptyBuffer :: Buffer -> Maybe S.ByteString
unsafeFreezeNonEmptyBuffer buf
| sliceSize buf <= 0 = Nothing
| otherwise = Just $ unsafeFreezeBuffer buf
updateEndOfSlice :: Buffer
-> Ptr Word8
-> Buffer
updateEndOfSlice (Buffer fpbuf p0 _ ope) op' = Buffer fpbuf p0 op' ope
sliceSize :: Buffer -> Int
sliceSize (Buffer _ p0 op _) = op `minusPtr` p0
type BufferAllocStrategy = (IO Buffer, Int -> Buffer -> IO (IO Buffer))
defaultStrategy :: BufferAllocStrategy
defaultStrategy = allNewBuffersStrategy defaultChunkSize
allNewBuffersStrategy :: Int
-> BufferAllocStrategy
allNewBuffersStrategy bufSize =
( allocBuffer bufSize
, \reqSize _ -> return (allocBuffer (max reqSize bufSize)) )
reuseBufferStrategy :: IO Buffer
-> BufferAllocStrategy
reuseBufferStrategy buf0 =
(buf0, tryReuseBuffer)
where
tryReuseBuffer reqSize buf
| bufferSize buf >= reqSize = return $ return (reuseBuffer buf)
| otherwise = return $ allocBuffer reqSize
bufferSize :: Buffer -> Int
bufferSize (Buffer fpbuf _ _ ope) =
ope `minusPtr` unsafeForeignPtrToPtr fpbuf
allocBuffer :: Int -> IO Buffer
allocBuffer size = do
fpbuf <- mallocByteString size
let !pbuf = unsafeForeignPtrToPtr fpbuf
return $! Buffer fpbuf pbuf pbuf (pbuf `plusPtr` size)
reuseBuffer :: Buffer -> Buffer
reuseBuffer (Buffer fpbuf _ _ ope) = Buffer fpbuf p0 p0 ope
where
p0 = unsafeForeignPtrToPtr fpbuf
vectorBuilder :: (PrimMonad m, PrimMonad n, V.Vector v e, PrimState m ~ PrimState n)
=> Int
-> ((e -> n ()) -> ConduitT i Void m r)
-> ConduitT i (v e) m r
vectorBuilder size inner = do
ref <- do
mv <- VM.new size
newMutVar $! S 0 mv id
res <- onAwait (yieldS ref) (inner (addE ref))
vs <- do
S idx mv front <- readMutVar ref
end <-
if idx == 0
then return []
else do
v <- V.unsafeFreeze mv
return [V.unsafeTake idx v]
return $ front end
Prelude.mapM_ yield vs
return res
data S s v e = S
!Int
!(V.Mutable v s e)
([v e] -> [v e])
onAwait :: Monad m
=> ConduitT i o m ()
-> ConduitT i Void m r
-> ConduitT i o m r
onAwait (ConduitT callback) (ConduitT sink0) = ConduitT $ \rest -> let
go (Done r) = rest r
go (HaveOutput _ o) = absurd o
go (NeedInput f g) = callback $ \() -> NeedInput (go . f) (go . g)
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover f i) = Leftover (go f) i
in go (sink0 Done)
yieldS :: PrimMonad m
=> MutVar (PrimState m) (S (PrimState m) v e)
-> ConduitT i (v e) m ()
yieldS ref = do
S idx mv front <- readMutVar ref
Prelude.mapM_ yield (front [])
writeMutVar ref $! S idx mv id
addE :: (PrimMonad m, V.Vector v e)
=> MutVar (PrimState m) (S (PrimState m) v e)
-> e
-> m ()
addE ref e = do
S idx mv front <- readMutVar ref
VM.write mv idx e
let idx' = succ idx
size = VM.length mv
if idx' >= size
then do
v <- V.unsafeFreeze mv
let front' = front . (v:)
mv' <- VM.new size
writeMutVar ref $! S 0 mv' front'
else writeMutVar ref $! S idx' mv front
mapAccumS
:: Monad m
=> (a -> s -> ConduitT b Void m s)
-> s
-> ConduitT () b m ()
-> ConduitT a Void m s
mapAccumS f s xs = do
(_, u) <- loop (sealConduitT xs, s)
return u
where loop r@(ys, !t) = await >>= maybe (return r) go
where go a = lift (ys $$++ f a t) >>= loop
peekForever :: Monad m => ConduitT i o m () -> ConduitT i o m ()
peekForever inner =
loop
where
loop = do
mx <- peek
case mx of
Nothing -> return ()
Just _ -> inner >> loop
peekForeverE :: (Monad m, MonoFoldable i)
=> ConduitT i o m ()
-> ConduitT i o m ()
peekForeverE inner =
loop
where
loop = do
mx <- peekE
case mx of
Nothing -> return ()
Just _ -> inner >> loop