module Data.Conduit.Binary
, sourceHandle
, sourceHandleUnsafe
, sourceIOHandle
, sourceFileRange
, sourceHandleRange
, sinkFile
, sinkHandle
, sinkIOHandle
, conduitFile
, conduitHandle
, sourceLbs
, head
, dropWhile
, take
, drop
, sinkCacheLength
, sinkLbs
, mapM_
, isolate
, takeWhile
, Data.Conduit.Binary.lines
) where
import qualified Data.Streaming.FileRead as FR
import Prelude hiding (head, take, drop, takeWhile, dropWhile, mapM_)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.Conduit
import Data.Conduit.List (sourceList, consume)
import Control.Exception (assert, finally)
import Control.Monad (unless, when)
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Trans.Resource (allocate, release)
import Control.Monad.Trans.Class (lift)
import qualified System.IO as IO
import Data.Word (Word8, Word64)
import Control.Applicative ((<$>))
import System.Directory (getTemporaryDirectory, removeFile)
import Data.ByteString.Lazy.Internal (defaultChunkSize)
import Data.ByteString.Internal (ByteString (PS), inlinePerformIO)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.ForeignPtr (touchForeignPtr)
import Foreign.Ptr (plusPtr)
import Foreign.Storable (peek)
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import Control.Monad.Trans.Resource (MonadResource)
sourceFile :: MonadResource m
=> FilePath
-> Producer m S.ByteString
sourceFile fp =
(FR.openFile fp)
loop h = do
bs <- liftIO $ FR.readChunk h
unless (S.null bs) $ do
yield bs
loop h
sourceHandle :: MonadIO m
=> IO.Handle
-> Producer m S.ByteString
sourceHandle h =
loop = do
bs <- liftIO (S.hGetSome h defaultChunkSize)
if S.null bs
then return ()
else yield bs >> loop
sourceHandleUnsafe :: MonadIO m => IO.Handle -> Source m ByteString
sourceHandleUnsafe handle = do
fptr <- liftIO $ mallocPlainForeignPtrBytes defaultChunkSize
let ptr = unsafeForeignPtrToPtr fptr
loop = do
count <- liftIO $ IO.hGetBuf handle ptr defaultChunkSize
when (count > 0) $ do
yield (PS fptr 0 count)
liftIO $ touchForeignPtr fptr
sourceIOHandle :: MonadResource m
=> IO IO.Handle
-> Producer m S.ByteString
sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
sinkHandle :: MonadIO m
=> IO.Handle
-> Consumer S.ByteString m ()
sinkHandle h = awaitForever $ \bs -> liftIO $ do
S.hPut h bs
IO.hFlush h
sinkIOHandle :: MonadResource m
=> IO IO.Handle
-> Consumer S.ByteString m ()
sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle
sourceFileRange :: MonadResource m
=> FilePath
-> Maybe Integer
-> Maybe Integer
-> Producer m S.ByteString
sourceFileRange fp offset count = bracketP
(IO.openBinaryFile fp IO.ReadMode)
(\h -> sourceHandleRange h offset count)
sourceHandleRange :: MonadIO m
=> IO.Handle
-> Maybe Integer
-> Maybe Integer
-> Producer m S.ByteString
sourceHandleRange handle offset count = do
case offset of
Nothing -> return ()
Just off -> liftIO $ IO.hSeek handle IO.AbsoluteSeek off
case count of
Nothing -> pullUnlimited
Just c -> pullLimited (fromInteger c)
pullUnlimited = do
bs <- liftIO $ S.hGetSome handle 4096
if S.null bs
then return ()
else do
yield bs
pullLimited c = do
bs <- liftIO $ S.hGetSome handle (min c 4096)
let c' = c S.length bs
assert (c' >= 0) $
if S.null bs
then return ()
else do
yield bs
pullLimited c'
sinkFile :: MonadResource m
=> FilePath
-> Consumer S.ByteString m ()
sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
conduitFile :: MonadResource m
=> FilePath
-> Conduit S.ByteString m S.ByteString
conduitFile fp = bracketP
(IO.openBinaryFile fp IO.WriteMode)
conduitHandle :: MonadIO m => IO.Handle -> Conduit S.ByteString m S.ByteString
conduitHandle h = awaitForever $ \bs -> liftIO (S.hPut h bs) >> yield bs
isolate :: Monad m
=> Int
-> Conduit S.ByteString m S.ByteString
isolate =
loop 0 = return ()
loop count = do
mbs <- await
case mbs of
Nothing -> return ()
Just bs -> do
let (a, b) = S.splitAt count bs
case count S.length a of
0 -> do
unless (S.null b) $ leftover b
yield a
count' -> assert (S.null b) $ yield a >> loop count'
head :: Monad m => Consumer S.ByteString m (Maybe Word8)
head = do
mbs <- await
case mbs of
Nothing -> return Nothing
Just bs ->
case S.uncons bs of
Nothing -> head
Just (w, bs') -> leftover bs' >> return (Just w)
takeWhile :: Monad m => (Word8 -> Bool) -> Conduit S.ByteString m S.ByteString
takeWhile p =
loop = await >>= maybe (return ()) go
go bs
| S.null x = next
| otherwise = yield x >> next
next = if S.null y then loop else leftover y
(x, y) = S.span p bs
dropWhile :: Monad m => (Word8 -> Bool) -> Consumer S.ByteString m ()
dropWhile p =
loop = do
mbs <- await
case S.dropWhile p <$> mbs of
Nothing -> return ()
Just bs
| S.null bs -> loop
| otherwise -> leftover bs
take :: Monad m => Int -> Consumer S.ByteString m L.ByteString
take 0 = return L.empty
take n0 = go n0 id
go n front =
await >>= maybe (return $ L.fromChunks $ front []) go'
go' bs =
case S.length bs `compare` n of
LT -> go (n S.length bs) (front . (bs:))
EQ -> return $ L.fromChunks $ front [bs]
GT ->
let (x, y) = S.splitAt n bs
in assert (not $ S.null y) $ leftover y >> return (L.fromChunks $ front [x])
drop :: Monad m => Int -> Consumer S.ByteString m ()
drop 0 = return ()
drop n0 = go n0
go n =
await >>= maybe (return ()) go'
go' bs =
case S.length bs `compare` n of
LT -> go (n S.length bs)
EQ -> return ()
GT ->
let y = S.drop n bs
in assert (not $ S.null y) $ leftover y >> return ()
lines :: Monad m => Conduit S.ByteString m S.ByteString
lines =
loop id
loop front = await >>= maybe (finish front) (go front)
finish front =
let final = front S.empty
in unless (S.null final) (yield final)
go sofar more =
case S.uncons second of
Just (_, second') -> yield (sofar first) >> go id second'
Nothing ->
let rest = sofar more
in loop $ S.append rest
(first, second) = S.breakByte 10 more
sourceLbs :: Monad m => L.ByteString -> Producer m S.ByteString
sourceLbs = sourceList . L.toChunks
sinkCacheLength :: (MonadResource m1, MonadResource m2)
=> Sink S.ByteString m1 (Word64, Source m2 S.ByteString)
sinkCacheLength = do
tmpdir <- liftIO getTemporaryDirectory
(releaseKey, (fp, h)) <- allocate
(IO.openBinaryTempFile tmpdir "conduit.cache")
(\(fp, h) -> IO.hClose h `finally` removeFile fp)
len <- sinkHandleLen h
liftIO $ IO.hClose h
return (len, sourceFile fp >> release releaseKey)
sinkHandleLen :: MonadResource m => IO.Handle -> Sink S.ByteString m Word64
sinkHandleLen h =
loop 0
loop x =
await >>= maybe (return x) go
go bs = do
liftIO $ S.hPut h bs
loop $ x + fromIntegral (S.length bs)
sinkLbs :: Monad m => Sink S.ByteString m L.ByteString
sinkLbs = fmap L.fromChunks consume
mapM_BS :: Monad m => (Word8 -> m ()) -> S.ByteString -> m ()
mapM_BS f (PS fptr offset len) = do
let start = unsafeForeignPtrToPtr fptr `plusPtr` offset
end = start `plusPtr` len
loop ptr
| ptr >= end = inlinePerformIO (touchForeignPtr fptr) `seq` return ()
| otherwise = do
f (inlinePerformIO (peek ptr))
loop (ptr `plusPtr` 1)
loop start
mapM_ :: Monad m => (Word8 -> m ()) -> Consumer S.ByteString m ()
mapM_ f = awaitForever (lift . mapM_BS f)