module Bio.Iteratee.Bgzf (
Block(..), decompressBgzfBlocks', decompressBgzfBlocks,
decompressBgzf, decompressPlain,
maxBlockSize, bgzfEofMarker, liftBlock, getOffset,
BgzfChunk(..), isBgzf, isGzip, parMapChunksIO,
compressBgzf, compressBgzfLv, compressBgzf', CompressParams(..),
compressChunk
) where
import Bio.Iteratee
import Bio.Prelude
import Control.Concurrent.Async ( async, wait )
import Foreign.C.String ( withCAString )
import Foreign.C.Types ( CInt(..), CChar(..), CUInt(..), CULong(..) )
import Foreign.Marshal.Alloc ( mallocBytes, free, allocaBytes )
import Foreign.Ptr ( nullPtr, castPtr, Ptr, plusPtr, minusPtr )
import Foreign.Storable ( peekByteOff, pokeByteOff )
import qualified Data.ByteString as S
import qualified Data.ByteString.Unsafe as S
data Block = Block { block_offset :: !FileOffset
, block_contents :: !Bytes }
instance NullPoint Block where emptyP = Block 0 S.empty
instance Nullable Block where nullC = S.null . block_contents
instance Monoid Block where
mempty = Block 0 S.empty
mappend (Block x s) (Block _ t) = Block x (s `S.append` t)
mconcat [] = Block 0 S.empty
mconcat bs@(Block x _:_) = Block x $ S.concat [s|Block _ s <- bs]
decompressPlain :: MonadIO m => Enumeratee Bytes Block m a
decompressPlain = eneeCheckIfDone (liftI . step 0)
where
step !o it (Chunk s) = eneeCheckIfDone (liftI . step (o + fromIntegral (S.length s))) . it $ Chunk (Block o s)
step _ it (EOF mx) = idone (liftI it) (EOF mx)
decompressBgzf :: MonadIO m => Enumeratee Bytes Bytes m a
decompressBgzf = decompressBgzfBlocks ><> mapChunks block_contents
decompressBgzfBlocks :: MonadIO m => Enumeratee Bytes Block m a
decompressBgzfBlocks out = do
np <- liftIO $ getNumCapabilities
decompressBgzfBlocks' np out
decompressBgzfBlocks' :: MonadIO m => Int -> Enumeratee Bytes Block m a
decompressBgzfBlocks' np = eneeCheckIfDonePass (go 0 emptyQ)
where
go !off !qq k (Just e) = handleSeek off qq k e
go !off !qq k Nothing = case popQ qq of
Just (a, qq') | lengthQ qq == np -> liftIO (wait a) >>= eneeCheckIfDonePass (go off qq') . k . Chunk
_ -> liftI $ go' off qq k
go' !_ !qq k (EOF mx) = goE mx qq k Nothing
go' !off !qq k (Chunk c)
| S.null c = liftI $ go' off qq k
| otherwise = joinIM $ enumPure1Chunk c $ do
(off', op) <- get_bgzf_block off
a <- liftIO (async op)
go off' (pushQ a qq) k Nothing
goE _ !qq k (Just e) = handleSeek 0 qq k e
goE mx !qq k Nothing = case popQ qq of
Nothing -> idone (liftI k) (EOF mx)
Just (a,qq') -> liftIO (wait a) >>= eneeCheckIfDonePass (goE mx qq') . k . Chunk
handleSeek !off !qq k e = case fromException e of
Nothing -> throwRecoverableErr e $ go' off qq k
Just (SeekException o) -> do
cancelAll qq
seek $ o `shiftR` 16
eneeCheckIfDonePass (go (o `shiftR` 16) emptyQ) $ do
block'drop . fromIntegral $ o .&. 0xffff
k (EOF Nothing)
block'drop sz = liftI $ \s -> case s of
EOF _ -> throwErr $ setEOF s
Chunk (Block p c)
| S.length c < sz -> block'drop (sz S.length c)
| otherwise -> let b' = Block (p + fromIntegral sz) (S.drop sz c)
in idone () (Chunk b')
get_bgzf_block :: MonadIO m => FileOffset -> Iteratee Bytes m (FileOffset, IO Block)
get_bgzf_block off = do !(csize,xlen) <- get_bgzf_header
!comp <- get_block . fromIntegral $ csize xlen 19
!crc <- endianRead4 LSB
!isize <- endianRead4 LSB
let !off' = off + fromIntegral csize + 1
op = decompress1 (off `shiftL` 16) comp crc (fromIntegral isize)
return (off',op)
where
get_block sz = liftI $ \s -> case s of
EOF _ -> throwErr $ setEOF s
Chunk c | S.length c < sz -> (:) c `liftM` get_block (sz S.length c)
| otherwise -> idone [S.take sz c] (Chunk (S.drop sz c))
get_bgzf_header :: Monad m => Iteratee Bytes m (Word16, Word16)
get_bgzf_header = do n <- heads "\31\139"
_cm <- headStream
flg <- headStream
if flg `testBit` 2 then do
dropStream 6
xlen <- endianRead2 LSB
it <- takeStream (fromIntegral xlen) get_bsize >>= lift . tryRun
case it of Left e -> throwErr e
Right s | n == 2 -> return (s,xlen)
_ -> throwErr $ iterStrExc "No BGZF"
else throwErr $ iterStrExc "No BGZF"
where
get_bsize = do i1 <- headStream
i2 <- headStream
len <- endianRead2 LSB
if i1 == 66 && i2 == 67 && len == 2
then endianRead2 LSB
else dropStream (fromIntegral len) >> get_bsize
isBgzf :: Monad m => Iteratee Bytes m Bool
isBgzf = liftM isRight $ checkErr $ iLookAhead $ get_bgzf_header
isGzip :: Monad m => Iteratee Bytes m Bool
isGzip = liftM (either (const False) id) $ checkErr $ iLookAhead $ test
where
test = do n <- heads "\31\139"
dropStream 24
b <- isFinished
return $ not b && n == 2
maxBlockSize :: Int
maxBlockSize = 65450
bgzfEofMarker :: Bytes
bgzfEofMarker = "\x1f\x8b\x8\x4\0\0\0\0\0\xff\x6\0\x42\x43\x2\0\x1b\0\x3\0\0\0\0\0\0\0\0\0"
decompress1 :: FileOffset -> [Bytes] -> Word32 -> Int -> IO Block
decompress1 off ss crc usize =
allocaBytes (112) $ \stream -> do
buf <- mallocBytes usize
(\hsc_ptr -> pokeByteOff hsc_ptr 48) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 64) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 72) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 80) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 0) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 24) stream buf
(\hsc_ptr -> pokeByteOff hsc_ptr 8) stream (0 :: CUInt)
(\hsc_ptr -> pokeByteOff hsc_ptr 32) stream (fromIntegral usize :: CUInt)
z_check "inflateInit2" =<< c_inflateInit2 stream (15)
forM_ ss $ \s -> case fromIntegral $ S.length s of
l | l > 0 -> S.unsafeUseAsCString s $ \p -> do
(\hsc_ptr -> pokeByteOff hsc_ptr 0) stream p
(\hsc_ptr -> pokeByteOff hsc_ptr 8) stream (l :: CUInt)
z_check "inflate" =<< c_inflate stream 0
_ -> return ()
z_check "inflate" =<< c_inflate stream 4
z_check "inflateEnd" =<< c_inflateEnd stream
pe <- (\hsc_ptr -> peekByteOff hsc_ptr 24) stream
when (pe `minusPtr` buf /= usize) $ error "size mismatch after deflate()"
crc0 <- c_crc32 0 nullPtr 0
crc' <- c_crc32 crc0 buf (fromIntegral usize)
when (fromIntegral crc /= crc') $ error "CRC error after deflate()"
Block off `liftM` S.unsafePackCStringFinalizer (castPtr buf) usize (free buf)
compress1 :: Int -> [Bytes] -> IO Bytes
compress1 _lv [] = return bgzfEofMarker
compress1 lv ss0 =
allocaBytes (112) $ \stream -> do
let input_length = sum (map S.length ss0)
when (input_length > maxBlockSize) $ error "Trying to create too big a BGZF block; this is a bug."
buf <- mallocBytes 65536
S.unsafeUseAsCString bgzfEofMarker $ \eof ->
forM_ [0,4..16] $ \o -> do x <- peekByteOff eof o
pokeByteOff buf o (x::Word32)
(\hsc_ptr -> pokeByteOff hsc_ptr 48) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 64) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 72) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 80) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 0) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 24) stream (buf `plusPtr` 18)
(\hsc_ptr -> pokeByteOff hsc_ptr 8) stream (0 :: CUInt)
(\hsc_ptr -> pokeByteOff hsc_ptr 32) stream (65536188 :: CUInt)
z_check "deflateInit2" =<< c_deflateInit2 stream (fromIntegral lv) 8
(15) 8 0
let go (s:ss) = do
crc <- go ss
S.unsafeUseAsCString s $ \p ->
case fromIntegral $ S.length s of
l | l > 0 -> do
(\hsc_ptr -> pokeByteOff hsc_ptr 0) stream p
(\hsc_ptr -> pokeByteOff hsc_ptr 8) stream (l :: CUInt)
z_check "deflate" =<< c_deflate stream 0
c_crc32 crc p l
_ -> return crc
go [] = c_crc32 0 nullPtr 0
crc <- go ss0
z_check "deflate" =<< c_deflate stream 4
z_check "deflateEnd" =<< c_deflateEnd stream
compressed_length <- (+) (18+8) `fmap` (\hsc_ptr -> peekByteOff hsc_ptr 40) stream
when (compressed_length > 65536) $ error "produced too big a block"
pokeByteOff buf 16 (fromIntegral $ (compressed_length1) .&. 0xff :: Word8)
pokeByteOff buf 17 (fromIntegral $ (compressed_length1) `shiftR` 8 :: Word8)
pokeByteOff buf (compressed_length8) (fromIntegral crc :: Word32)
pokeByteOff buf (compressed_length4) (fromIntegral input_length :: Word32)
S.unsafePackCStringFinalizer buf compressed_length (free buf)
data ZStream
z_check :: String -> CInt -> IO ()
z_check msg c = when (c /= 0 && c /= 1) $
error $ msg ++ " failed: " ++ show c
c_deflateInit2 :: Ptr ZStream -> CInt -> CInt -> CInt -> CInt -> CInt -> IO CInt
c_deflateInit2 z a b c d e = withCAString "1.2.8" $ \versionStr ->
c_deflateInit2_ z a b c d e versionStr (112 :: CInt)
foreign import ccall unsafe "zlib.h deflateInit2_" c_deflateInit2_ ::
Ptr ZStream -> CInt -> CInt -> CInt -> CInt -> CInt
-> Ptr CChar -> CInt -> IO CInt
c_inflateInit2 :: Ptr ZStream -> CInt -> IO CInt
c_inflateInit2 z a = withCAString "1.2.8" $ \versionStr ->
c_inflateInit2_ z a versionStr (112 :: CInt)
foreign import ccall unsafe "zlib.h inflateInit2_" c_inflateInit2_ ::
Ptr ZStream -> CInt -> Ptr CChar -> CInt -> IO CInt
foreign import ccall unsafe "zlib.h deflate" c_deflate ::
Ptr ZStream -> CInt -> IO CInt
foreign import ccall unsafe "zlib.h inflate" c_inflate ::
Ptr ZStream -> CInt -> IO CInt
foreign import ccall unsafe "zlib.h deflateEnd" c_deflateEnd ::
Ptr ZStream -> IO CInt
foreign import ccall unsafe "zlib.h inflateEnd" c_inflateEnd ::
Ptr ZStream -> IO CInt
foreign import ccall unsafe "zlib.h crc32" c_crc32 ::
CULong -> Ptr CChar -> CUInt -> IO CULong
getOffset :: Iteratee Block m FileOffset
getOffset = liftI step
where
step s@(EOF _) = icont step (Just (setEOF s))
step s@(Chunk (Block o _)) = idone o s
liftBlock :: Monad m => Iteratee Bytes m a -> Iteratee Block m a
liftBlock = liftI . step
where
step it (EOF ex) = joinI $ lift $ enumChunk (EOF ex) it
step it (Chunk (Block !l !s)) = Iteratee $ \od oc ->
enumPure1Chunk s it >>= \it' -> runIter it' (onDone od) (oc . step . liftI)
where
!sl = S.length s
onDone od hdr (Chunk !rest) = od hdr . Chunk $! Block (l + fromIntegral (sl S.length rest)) rest
onDone od hdr (EOF ex) = od hdr (EOF ex)
compressBgzf' :: MonadIO m => CompressParams -> Enumeratee BgzfChunk Bytes m a
compressBgzf' (CompressParams lv np) = bgzfBlocks ><> parMapChunksIO np (compress1 lv)
data BgzfChunk = SpecialChunk !Bytes BgzfChunk
| RecordChunk !Bytes BgzfChunk
| LeftoverChunk !Bytes BgzfChunk
| NoChunk
instance NullPoint BgzfChunk where emptyP = NoChunk
instance Nullable BgzfChunk where
nullC NoChunk = True
nullC (SpecialChunk s c) = S.null s && nullC c
nullC (RecordChunk s c) = S.null s && nullC c
nullC (LeftoverChunk s c) = S.null s && nullC c
bgzfBlocks :: Monad m => Enumeratee BgzfChunk [Bytes] m a
bgzfBlocks = eneeCheckIfDone (liftI . to_blocks 0 [])
where
to_blocks _alen acc k (EOF mx) =
lift (enumPure1Chunk [S.empty] (k $ Chunk acc)) >>= flip idone (EOF mx)
to_blocks alen acc k (Chunk NoChunk) = liftI $ to_blocks alen acc k
to_blocks alen acc k (Chunk (SpecialChunk c cs))
| alen + S.length c < maxBlockSize = eneeCheckIfDone (\k' -> to_blocks 0 [] k' (Chunk cs)) . k $ Chunk (c:acc)
| null acc = let (l,r) = S.splitAt maxBlockSize c
in eneeCheckIfDone (\k' -> to_blocks 0 [] k' (Chunk (SpecialChunk r cs))) . k $ Chunk [l]
| otherwise = eneeCheckIfDone (\k' -> to_blocks 0 [] k' (Chunk (SpecialChunk c cs))) . k $ Chunk acc
to_blocks alen acc k (Chunk (RecordChunk c cs))
| alen + S.length c + 4 < maxBlockSize = to_blocks (alen + S.length c + 4) (c:encLength c:acc) k (Chunk cs)
| null acc = let (l,r) = S.splitAt (maxBlockSize4) c
in eneeCheckIfDone (\k' -> to_blocks 0 [] k' (Chunk (LeftoverChunk r cs))) . k $
Chunk [l, encLength l]
| otherwise = eneeCheckIfDone (\k' -> to_blocks 0 [] k' (Chunk (RecordChunk c cs))) . k $ Chunk acc
where
encLength s = let !l = S.length s in S.pack [ fromIntegral (l `shiftR` 0 .&. 0xff)
, fromIntegral (l `shiftR` 8 .&. 0xff)
, fromIntegral (l `shiftR` 16 .&. 0xff)
, fromIntegral (l `shiftR` 24 .&. 0xff) ]
to_blocks alen acc k (Chunk (LeftoverChunk c cs))
| alen + S.length c < maxBlockSize = to_blocks (alen + S.length c) (c:acc) k (Chunk cs)
| null acc = let (l,r) = S.splitAt maxBlockSize c
in eneeCheckIfDone (\k' -> to_blocks 0 [] k' (Chunk (LeftoverChunk r cs))) . k $ Chunk [l]
| otherwise = eneeCheckIfDone (\k' -> to_blocks 0 [] k' (Chunk (LeftoverChunk c cs))) . k $ Chunk acc
compressBgzf :: MonadIO m => Enumeratee BgzfChunk Bytes m a
compressBgzf = compressBgzfLv 6
compressBgzfLv :: MonadIO m => Int -> Enumeratee BgzfChunk Bytes m a
compressBgzfLv lv out = do
np <- liftIO $ getNumCapabilities
compressBgzf' (CompressParams lv (np+2)) out
data CompressParams = CompressParams {
compression_level :: Int,
queue_depth :: Int }
deriving Show
compressChunk :: Int -> Ptr Word8 -> CUInt -> IO Bytes
compressChunk lv ptr len =
allocaBytes (112) $ \stream -> do
buf <- mallocBytes 65536
S.unsafeUseAsCString bgzfEofMarker $ \eof ->
forM_ [0,4..16] $ \o -> do x <- peekByteOff eof o
pokeByteOff buf o (x::Word32)
(\hsc_ptr -> pokeByteOff hsc_ptr 48) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 64) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 72) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 80) stream nullPtr
(\hsc_ptr -> pokeByteOff hsc_ptr 0) stream ptr
(\hsc_ptr -> pokeByteOff hsc_ptr 24) stream (buf `plusPtr` 18)
(\hsc_ptr -> pokeByteOff hsc_ptr 8) stream len
(\hsc_ptr -> pokeByteOff hsc_ptr 32) stream (65536188 :: CUInt)
z_check "deflateInit2" =<< c_deflateInit2 stream (fromIntegral lv) 8
(15) 8 0
z_check "deflate" =<< c_deflate stream 4
z_check "deflateEnd" =<< c_deflateEnd stream
crc0 <- c_crc32 0 nullPtr 0
crc <- c_crc32 crc0 (castPtr ptr) len
compressed_length <- (+) (18+8) `fmap` (\hsc_ptr -> peekByteOff hsc_ptr 40) stream
when (compressed_length > 65536) $ error "produced too big a block"
pokeByteOff buf 16 (fromIntegral $ (compressed_length1) .&. 0xff :: Word8)
pokeByteOff buf 17 (fromIntegral $ (compressed_length1) `shiftR` 8 :: Word8)
pokeByteOff buf (compressed_length8) (fromIntegral crc :: Word32)
pokeByteOff buf (compressed_length4) (fromIntegral len :: Word32)
S.unsafePackCStringFinalizer buf compressed_length (free buf)