module Thrift.Transport.Header
( module Thrift.Transport
, HeaderTransport(..)
, openHeaderTransport
, ProtocolType(..)
, TransformType(..)
, ClientType(..)
, tResetProtocol
, tSetProtocol
) where
import Thrift.Transport
import Thrift.Protocol.Compact
import Control.Applicative
import Control.Exception ( throw )
import Control.Monad
import Data.Bits
import Data.IORef
import Data.Int
import Data.Monoid
import Data.Word
import qualified Data.Attoparsec.ByteString as P
import qualified Data.Binary as Binary
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Builder as B
import qualified Data.Map as Map
data ProtocolType = TBinary | TCompact | TJSON deriving (Enum, Eq)
data ClientType = HeaderClient | Framed | Unframed deriving (Enum, Eq)
infoIdKeyValue = 1
type Headers = Map.Map String String
data TransformType = ZlibTransform deriving (Enum, Eq)
fromTransportType :: TransformType -> Int16
fromTransportType ZlibTransform = 1
toTransportType :: Int16 -> TransformType
toTransportType 1 = ZlibTransform
toTransportType _ = throw $ TransportExn "HeaderTransport: Unknown transform ID" TE_UNKNOWN
data HeaderTransport i o = (Transport i, Transport o) => HeaderTransport
{ readBuffer :: IORef LBS.ByteString
, writeBuffer :: IORef B.Builder
, inTrans :: i
, outTrans :: o
, clientType :: IORef ClientType
, protocolType :: IORef ProtocolType
, headers :: IORef [(String, String)]
, writeHeaders :: Headers
, transforms :: IORef [TransformType]
, writeTransforms :: [TransformType]
}
openHeaderTransport :: (Transport i, Transport o) => i -> o -> IO (HeaderTransport i o)
openHeaderTransport i o = do
pid <- newIORef TCompact
rBuf <- newIORef LBS.empty
wBuf <- newIORef mempty
cType <- newIORef HeaderClient
h <- newIORef []
trans <- newIORef []
return HeaderTransport
{ readBuffer = rBuf
, writeBuffer = wBuf
, inTrans = i
, outTrans = o
, clientType = cType
, protocolType = pid
, headers = h
, writeHeaders = Map.empty
, transforms = trans
, writeTransforms = []
}
isFramed t = (/= Unframed) <$> readIORef (clientType t)
readFrame :: (Transport i, Transport o) => HeaderTransport i o -> IO Bool
readFrame t = do
let input = inTrans t
let rBuf = readBuffer t
let cType = clientType t
lsz <- tRead input 4
let sz = LBS.toStrict lsz
case P.parseOnly P.endOfInput sz of
Right _ -> do return False
Left _ -> do
case parseBinaryMagic sz of
Right _ -> do
writeIORef rBuf $ lsz
writeIORef cType Unframed
writeIORef (protocolType t) TBinary
return True
Left _ -> do
case parseCompactMagic sz of
Right _ -> do
writeIORef rBuf $ lsz
writeIORef cType Unframed
writeIORef (protocolType t) TCompact
return True
Left _ -> do
let len = Binary.decode lsz :: Int32
lbuf <- tReadAll input $ fromIntegral len
let buf = LBS.toStrict lbuf
case parseBinaryMagic buf of
Right _ -> do
writeIORef cType Framed
writeIORef (protocolType t) TBinary
writeIORef rBuf lbuf
return True
Left _ -> do
case parseCompactMagic buf of
Right _ -> do
writeIORef cType Framed
writeIORef (protocolType t) TCompact
writeIORef rBuf lbuf
return True
Left _ -> do
case parseHeaderMagic buf of
Right flags -> do
let (flags, seqNum, header, body) = extractHeader buf
writeIORef cType HeaderClient
handleHeader t header
payload <- untransform t body
writeIORef rBuf $ LBS.fromStrict $ payload
return True
Left _ ->
throw $ TransportExn "HeaderTransport: unkonwn client type" TE_UNKNOWN
parseBinaryMagic = P.parseOnly $ P.word8 0x80 *> P.word8 0x01 *> P.word8 0x00 *> P.anyWord8
parseCompactMagic = P.parseOnly $ P.word8 0x82 *> P.satisfy (\b -> b .&. 0x1f == 0x01)
parseHeaderMagic = P.parseOnly $ P.word8 0x0f *> P.word8 0xff *> (P.count 2 P.anyWord8)
parseI32 :: P.Parser Int32
parseI32 = Binary.decode . LBS.fromStrict <$> P.take 4
parseI16 :: P.Parser Int16
parseI16 = Binary.decode . LBS.fromStrict <$> P.take 2
extractHeader :: BS.ByteString -> (Int16, Int32, BS.ByteString, BS.ByteString)
extractHeader bs =
case P.parse extractHeader_ bs of
P.Done remain (flags, seqNum, header) -> (flags, seqNum, header, remain)
_ -> throw $ TransportExn "HeaderTransport: Invalid header" TE_UNKNOWN
where
extractHeader_ = do
magic <- P.word8 0x0f *> P.word8 0xff
flags <- parseI16
seqNum <- parseI32
(headerSize :: Int) <- (* 4) . fromIntegral <$> parseI16
header <- P.take headerSize
return (flags, seqNum, header)
handleHeader t header =
case P.parseOnly parseHeader header of
Right (pType, trans, info) -> do
writeIORef (protocolType t) pType
writeIORef (transforms t) trans
writeIORef (headers t) info
_ -> throw $ TransportExn "HeaderTransport: Invalid header" TE_UNKNOWN
iw16 :: Int16 -> Word16
iw16 = fromIntegral
iw32 :: Int32 -> Word32
iw32 = fromIntegral
wi16 :: Word16 -> Int16
wi16 = fromIntegral
wi32 :: Word32 -> Int32
wi32 = fromIntegral
parseHeader :: P.Parser (ProtocolType, [TransformType], [(String, String)])
parseHeader = do
protocolType <- toProtocolType <$> parseVarint wi16
numTrans <- fromIntegral <$> parseVarint wi16
trans <- replicateM numTrans parseTransform
info <- parseInfo
return (protocolType, trans, info)
toProtocolType :: Int16 -> ProtocolType
toProtocolType 0 = TBinary
toProtocolType 1 = TJSON
toProtocolType 2 = TCompact
fromProtocolType :: ProtocolType -> Int16
fromProtocolType TBinary = 0
fromProtocolType TJSON = 1
fromProtocolType TCompact = 2
parseTransform :: P.Parser TransformType
parseTransform = toTransportType <$> parseVarint wi16
parseInfo :: P.Parser [(String, String)]
parseInfo = do
n <- P.eitherP P.endOfInput (parseVarint wi32)
case n of
Left _ -> return []
Right n0 ->
replicateM (fromIntegral n0) $ do
klen <- parseVarint wi16
k <- P.take $ fromIntegral klen
vlen <- parseVarint wi16
v <- P.take $ fromIntegral vlen
return (C.unpack k, C.unpack v)
parseString :: P.Parser BS.ByteString
parseString = parseVarint wi32 >>= (P.take . fromIntegral)
buildHeader :: HeaderTransport i o -> IO B.Builder
buildHeader t = do
pType <- readIORef $ protocolType t
let pId = buildVarint $ iw16 $ fromProtocolType pType
let headerContent = pId <> (buildTransforms t) <> (buildInfo t)
let len = fromIntegral $ LBS.length $ B.toLazyByteString headerContent
let padding = mconcat $ replicate (mod len 4) $ B.word8 0
let codedLen = B.int16BE (fromIntegral $ (quot (len - 1) 4) + 1)
let flags = 0
let seqNum = 0
return $ B.int16BE 0x0fff <> B.int16BE flags <> B.int32BE seqNum <> codedLen <> headerContent <> padding
buildTransforms :: HeaderTransport i o -> B.Builder
buildTransforms t =
let trans = writeTransforms t in
(buildVarint $ iw16 $ fromIntegral $ length trans) <>
(mconcat $ map (buildVarint . iw16 . fromTransportType) trans)
buildInfo :: HeaderTransport i o -> B.Builder
buildInfo t =
let h = Map.assocs $ writeHeaders t in
case length h of
0 -> mempty
len -> (buildVarint $ iw16 $ fromIntegral $ len) <> (mconcat $ map buildInfoEntry h)
where
buildInfoEntry (k, v) = buildVarStr k <> buildVarStr v
buildVarStr s = (buildVarint $ iw16 $ fromIntegral $ length s) <> B.string8 s
tResetProtocol :: (Transport i, Transport o) => HeaderTransport i o -> IO Bool
tResetProtocol t = do
rBuf <- readIORef $ readBuffer t
writeIORef (clientType t) HeaderClient
readFrame t
tSetProtocol :: (Transport i, Transport o) => HeaderTransport i o -> ProtocolType -> IO ()
tSetProtocol t = writeIORef (protocolType t)
transform :: HeaderTransport i o -> LBS.ByteString -> LBS.ByteString
transform t bs =
foldr applyTransform bs $ writeTransforms t
where
applyTransform bs _ =
throw $ TransportExn "HeaderTransport: Unknown transform" TE_UNKNOWN
untransform :: HeaderTransport i o -> BS.ByteString -> IO BS.ByteString
untransform t bs = do
trans <- readIORef $ transforms t
return $ foldl unapplyTransform bs trans
where
unapplyTransform bs _ =
throw $ TransportExn "HeaderTransport: Unknown transform" TE_UNKNOWN
instance (Transport i, Transport o) => Transport (HeaderTransport i o) where
tIsOpen t = do
tIsOpen (inTrans t)
tIsOpen (outTrans t)
tClose t = do
tClose(outTrans t)
tClose(inTrans t)
tRead t len = do
rBuf <- readIORef $ readBuffer t
if not $ LBS.null rBuf
then do
let (consumed, remain) = LBS.splitAt (fromIntegral len) rBuf
writeIORef (readBuffer t) remain
return consumed
else do
framed <- isFramed t
if not framed
then tRead (inTrans t) len
else do
ok <- readFrame t
if ok
then tRead t len
else return LBS.empty
tPeek t = do
rBuf <- readIORef (readBuffer t)
if not $ LBS.null rBuf
then return $ Just $ LBS.head rBuf
else do
framed <- isFramed t
if not framed
then tPeek (inTrans t)
else do
ok <- readFrame t
if ok
then tPeek t
else return Nothing
tWrite t buf = do
let wBuf = writeBuffer t
framed <- isFramed t
if framed
then modifyIORef wBuf (<> B.lazyByteString buf)
else
tWrite(outTrans t) buf
tFlush t = do
cType <- readIORef $ clientType t
case cType of
Unframed -> tFlush $ outTrans t
Framed -> flushBuffer t id mempty
HeaderClient -> buildHeader t >>= flushBuffer t (transform t)
where
flushBuffer t f header = do
wBuf <- readIORef $ writeBuffer t
writeIORef (writeBuffer t) mempty
let payload = B.toLazyByteString (header <> wBuf)
tWrite (outTrans t) $ Binary.encode (fromIntegral $ LBS.length payload :: Int32)
tWrite (outTrans t) $ f payload
tFlush (outTrans t)