module Data.Conduit.Binary
(
CC.sourceFile
, CC.sourceHandle
, CC.sourceHandleUnsafe
, CC.sourceIOHandle
, sourceFileRange
, sourceHandleRange
, sourceHandleRangeWithBuffer
, CC.withSourceFile
, CC.sinkFile
, CC.sinkFileCautious
, CC.sinkTempFile
, CC.sinkSystemTempFile
, CC.sinkHandle
, CC.sinkIOHandle
, CC.sinkHandleBuilder
, CC.sinkHandleFlush
, CC.withSinkFile
, CC.withSinkFileBuilder
, CC.withSinkFileCautious
, conduitFile
, conduitHandle
, sourceLbs
, head
, dropWhile
, take
, drop
, sinkCacheLength
, sinkLbs
, mapM_
, sinkStorable
, sinkStorableEx
, isolate
, takeWhile
, Data.Conduit.Binary.lines
) where
import qualified Data.Conduit.Combinators as CC
import Prelude hiding (head, take, drop, takeWhile, dropWhile, mapM_)
import qualified Data.ByteString as S
import Data.ByteString.Unsafe (unsafeUseAsCString)
import qualified Data.ByteString.Lazy as L
import Data.Conduit
import Data.Conduit.List (sourceList, consume)
import Control.Exception (assert, finally)
import Control.Monad (unless)
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Trans.Resource (allocate, release, MonadThrow (..))
import Control.Monad.Trans.Class (lift)
import qualified System.IO as IO
import Data.Word (Word8, Word64)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>))
#endif
import System.Directory (getTemporaryDirectory, removeFile)
import Data.ByteString.Lazy.Internal (defaultChunkSize)
import Data.ByteString.Internal (ByteString (PS), accursedUnutterablePerformIO)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.ForeignPtr (touchForeignPtr)
import Foreign.Ptr (plusPtr, castPtr)
import Foreign.Storable (Storable, peek, sizeOf)
import Control.Monad.Trans.Resource (MonadResource)
import Control.Exception (Exception)
import Data.Typeable (Typeable)
import Foreign.Ptr (Ptr)
#ifndef ALLOW_UNALIGNED_ACCESS
import Foreign.Marshal (alloca, copyBytes)
#endif
sourceFileRange :: MonadResource m
=> FilePath
-> Maybe Integer
-> Maybe Integer
-> ConduitT i S.ByteString m ()
sourceFileRange fp offset count = bracketP
(IO.openBinaryFile fp IO.ReadMode)
IO.hClose
(\h -> sourceHandleRange h offset count)
sourceHandleRange :: MonadIO m
=> IO.Handle
-> Maybe Integer
-> Maybe Integer
-> ConduitT i S.ByteString m ()
sourceHandleRange handle offset count =
sourceHandleRangeWithBuffer handle offset count defaultChunkSize
sourceHandleRangeWithBuffer :: MonadIO m
=> IO.Handle
-> Maybe Integer
-> Maybe Integer
-> Int
-> ConduitT i S.ByteString m ()
sourceHandleRangeWithBuffer handle offset count buffer = do
case offset of
Nothing -> return ()
Just off -> liftIO $ IO.hSeek handle IO.AbsoluteSeek off
case count of
Nothing -> pullUnlimited
Just c -> pullLimited (fromInteger c)
where
pullUnlimited = do
bs <- liftIO $ S.hGetSome handle buffer
if S.null bs
then return ()
else do
yield bs
pullUnlimited
pullLimited c = do
bs <- liftIO $ S.hGetSome handle (min c buffer)
let c' = c S.length bs
assert (c' >= 0) $
if S.null bs
then return ()
else do
yield bs
pullLimited c'
conduitFile :: MonadResource m
=> FilePath
-> ConduitT S.ByteString S.ByteString m ()
conduitFile fp = bracketP
(IO.openBinaryFile fp IO.WriteMode)
IO.hClose
conduitHandle
conduitHandle :: MonadIO m => IO.Handle -> ConduitT S.ByteString S.ByteString m ()
conduitHandle h = awaitForever $ \bs -> liftIO (S.hPut h bs) >> yield bs
isolate :: Monad m
=> Int
-> ConduitT S.ByteString S.ByteString m ()
isolate =
loop
where
loop 0 = return ()
loop count = do
mbs <- await
case mbs of
Nothing -> return ()
Just bs -> do
let (a, b) = S.splitAt count bs
case count S.length a of
0 -> do
unless (S.null b) $ leftover b
yield a
count' -> assert (S.null b) $ yield a >> loop count'
head :: Monad m => ConduitT S.ByteString o m (Maybe Word8)
head = do
mbs <- await
case mbs of
Nothing -> return Nothing
Just bs ->
case S.uncons bs of
Nothing -> head
Just (w, bs') -> leftover bs' >> return (Just w)
takeWhile :: Monad m => (Word8 -> Bool) -> ConduitT S.ByteString S.ByteString m ()
takeWhile p =
loop
where
loop = await >>= maybe (return ()) go
go bs
| S.null x = next
| otherwise = yield x >> next
where
next = if S.null y then loop else leftover y
(x, y) = S.span p bs
dropWhile :: Monad m => (Word8 -> Bool) -> ConduitT S.ByteString o m ()
dropWhile p =
loop
where
loop = do
mbs <- await
case S.dropWhile p <$> mbs of
Nothing -> return ()
Just bs
| S.null bs -> loop
| otherwise -> leftover bs
take :: Monad m => Int -> ConduitT S.ByteString o m L.ByteString
take 0 = return L.empty
take n0 = go n0 id
where
go n front =
await >>= maybe (return $ L.fromChunks $ front []) go'
where
go' bs =
case S.length bs `compare` n of
LT -> go (n S.length bs) (front . (bs:))
EQ -> return $ L.fromChunks $ front [bs]
GT ->
let (x, y) = S.splitAt n bs
in assert (not $ S.null y) $ leftover y >> return (L.fromChunks $ front [x])
drop :: Monad m => Int -> ConduitT S.ByteString o m ()
drop 0 = return ()
drop n0 = go n0
where
go n =
await >>= maybe (return ()) go'
where
go' bs =
case S.length bs `compare` n of
LT -> go (n S.length bs)
EQ -> return ()
GT ->
let y = S.drop n bs
in assert (not $ S.null y) $ leftover y >> return ()
lines :: Monad m => ConduitT S.ByteString S.ByteString m ()
lines =
loop []
where
loop acc = await >>= maybe (finish acc) (go acc)
finish acc =
let final = S.concat $ reverse acc
in unless (S.null final) (yield final)
go acc more =
case S.uncons second of
Just (_, second') -> yield (S.concat $ reverse $ first:acc) >> go [] second'
Nothing -> loop $ more:acc
where
(first, second) = S.break (== 10) more
sourceLbs :: Monad m => L.ByteString -> ConduitT i S.ByteString m ()
sourceLbs = sourceList . L.toChunks
sinkCacheLength :: (MonadResource m1, MonadResource m2)
=> ConduitT S.ByteString o m1 (Word64, ConduitT i S.ByteString m2 ())
sinkCacheLength = do
tmpdir <- liftIO getTemporaryDirectory
(releaseKey, (fp, h)) <- allocate
(IO.openBinaryTempFile tmpdir "conduit.cache")
(\(fp, h) -> IO.hClose h `finally` removeFile fp)
len <- sinkHandleLen h
liftIO $ IO.hClose h
return (len, CC.sourceFile fp >> release releaseKey)
where
sinkHandleLen :: MonadResource m => IO.Handle -> ConduitT S.ByteString o m Word64
sinkHandleLen h =
loop 0
where
loop x =
await >>= maybe (return x) go
where
go bs = do
liftIO $ S.hPut h bs
loop $ x + fromIntegral (S.length bs)
sinkLbs :: Monad m => ConduitT S.ByteString o m L.ByteString
sinkLbs = fmap L.fromChunks consume
mapM_BS :: Monad m => (Word8 -> m ()) -> S.ByteString -> m ()
mapM_BS f (PS fptr offset len) = do
let start = unsafeForeignPtrToPtr fptr `plusPtr` offset
end = start `plusPtr` len
loop ptr
| ptr >= end = accursedUnutterablePerformIO (touchForeignPtr fptr) `seq` return ()
| otherwise = do
f (accursedUnutterablePerformIO (peek ptr))
loop (ptr `plusPtr` 1)
loop start
mapM_ :: Monad m => (Word8 -> m ()) -> ConduitT S.ByteString o m ()
mapM_ f = awaitForever (lift . mapM_BS f)
sinkStorable :: (Monad m, Storable a) => ConduitT S.ByteString o m (Maybe a)
sinkStorable = sinkStorableHelper Just (return Nothing)
sinkStorableEx :: (MonadThrow m, Storable a) => ConduitT S.ByteString o m a
sinkStorableEx = sinkStorableHelper id (throwM SinkStorableInsufficientBytes)
sinkStorableHelper :: forall m a b o. (Monad m, Storable a)
=> (a -> b)
-> (ConduitT S.ByteString o m b)
-> ConduitT S.ByteString o m b
sinkStorableHelper wrap failure = do
start
where
size = sizeOf (undefined :: a)
start = do
mbs <- await
case mbs of
Nothing -> failure
Just bs
| S.null bs -> start
| otherwise ->
case compare (S.length bs) size of
LT -> do
leftover bs
lbs <- take size
let bs' = S.concat $ L.toChunks lbs
case compare (S.length bs') size of
LT -> do
leftover bs'
failure
EQ -> process bs'
GT -> assert False (process bs')
EQ -> process bs
GT -> do
let (x, y) = S.splitAt size bs
leftover y
process x
process bs = return $! wrap $! accursedUnutterablePerformIO $!
unsafeUseAsCString bs (safePeek undefined . castPtr)
safePeek :: a -> Ptr a -> IO a
#ifdef ALLOW_UNALIGNED_ACCESS
safePeek _ = peek
#else
safePeek val ptr = alloca (\t -> copyBytes t ptr (sizeOf val) >> peek t)
#endif
data SinkStorableException = SinkStorableInsufficientBytes
deriving (Show, Typeable)
instance Exception SinkStorableException