{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Server.Sender (frameSender) where
import Control.Concurrent.STM
import qualified Control.Exception as E
import qualified Data.ByteString as BS
import Data.ByteString.Builder (Builder)
import qualified Data.ByteString.Builder.Extra as B
import Data.IORef
import Foreign.Ptr (plusPtr)
import Network.ByteOrder
import Imports
import Network.HPACK (setLimitForEncoding, toHeaderTable)
import Network.HTTP2
import Network.HTTP2.Priority (isEmptySTM, dequeueSTM, Precedence)
import Network.HTTP2.Server.API
import Network.HTTP2.Server.EncodeFrame
import Network.HTTP2.Server.HPACK
import Network.HTTP2.Server.Manager hiding (start)
import Network.HTTP2.Server.Types
import Network.HTTP2.Server.Queue
import Network.HTTP2.Server.Context
import Network.HTTP2.Server.Stream
data Leftover = LZero
| LOne B.BufferWriter
| LTwo ByteString B.BufferWriter
{-# INLINE getStreamWindowSize #-}
getStreamWindowSize :: Stream -> IO WindowSize
getStreamWindowSize Stream{streamWindow} = readTVarIO streamWindow
{-# INLINE waitStreamWindowSize #-}
waitStreamWindowSize :: Stream -> IO ()
waitStreamWindowSize Stream{streamWindow} = atomically $ do
w <- readTVar streamWindow
check (w > 0)
{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming tbq = atomically $ do
isEmpty <- isEmptyTBQueue tbq
check (not isEmpty)
data Switch = C Control
| O (StreamId,Precedence,Output)
| Flush
frameSender :: Context -> Config -> Manager -> IO ()
frameSender ctx@Context{outputQ,controlQ,connectionWindow,encodeDynamicTable}
conf@Config{..}
mgr = loop 0 `E.catch` ignore
where
dequeue off = do
isEmpty <- isEmptyTQueue controlQ
if isEmpty then do
w <- readTVar connectionWindow
check (w > 0)
emp <- isEmptySTM outputQ
if emp then
if off /= 0 then return Flush else retry
else
O <$> dequeueSTM outputQ
else
C <$> readTQueue controlQ
hardLimit = confBufferSize - 512
loop off = do
x <- atomically $ dequeue off
case x of
C ctl -> do
when (off /= 0) $ flushN off
off' <- control ctl off
when (off' >= 0) $ loop off'
O (_,pre,out) -> do
let strm = outputStream out
writeIORef (streamPrecedence strm) pre
off' <- outputOrEnqueueAgain out off
case off' of
0 -> loop 0
_ | off' > hardLimit -> flushN off' >> loop 0
| otherwise -> loop off'
Flush -> flushN off >> loop 0
control CFinish _ = return (-1)
control (CGoaway frame) _ = confSendAll frame >> return (-1)
control (CFrame frame) _ = confSendAll frame >> return 0
control (CSettings frame alist) _ = do
confSendAll frame
setLimit alist
return 0
control (CSettings0 frame1 frame2 alist) off = do
let !buf = confWriteBuffer `plusPtr` off
!off' = off + BS.length frame1 + BS.length frame2
buf' <- copy buf frame1
void $ copy buf' frame2
setLimit alist
return off'
{-# INLINE setLimit #-}
setLimit alist = case lookup SettingsHeaderTableSize alist of
Nothing -> return ()
Just siz -> setLimitForEncoding siz encodeDynamicTable
output out@(Output strm (Response _ _ _ _) (ONext curr tlrmkr) _ sentinel) off0 lim = do
let !buf = confWriteBuffer `plusPtr` off0
!siz = confBufferSize - off0
!payloadOff = off0 + frameHeaderLength
Next datPayloadLen mnext <- curr buf siz lim
NextTrailersMaker !tlrmkr' <- runTrailersMaker tlrmkr payloadOff datPayloadLen
fillDataHeaderEnqueueNext strm off0 datPayloadLen mnext tlrmkr' sentinel out
output out@(Output strm (Response st hdr body tlrmkr) ORspn mtbq sentinel) off0 lim = do
let !sid = streamNumber strm
!endOfStream = case body of
RspNoBody -> True
_ -> False
(ths,_) <- toHeaderTable $ fixHeaders st hdr
kvlen <- headerContinue sid ths endOfStream off0
off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen
case body of
RspNoBody -> do
halfClosedLocal ctx strm Finished
return off
RspFile (FileSpec path fileoff bytecount) -> do
let payloadOff = off + frameHeaderLength
Next datPayloadLen mnext <-
fillFileBodyGetNext conf payloadOff lim path fileoff bytecount mgr confPositionReadMaker
NextTrailersMaker !tlrmkr' <- runTrailersMaker tlrmkr payloadOff datPayloadLen
fillDataHeaderEnqueueNext strm off datPayloadLen mnext tlrmkr' sentinel out
RspBuilder builder -> do
let payloadOff = off + frameHeaderLength
Next datPayloadLen mnext <-
fillBuilderBodyGetNext conf payloadOff lim builder
NextTrailersMaker !tlrmkr' <- runTrailersMaker tlrmkr payloadOff datPayloadLen
fillDataHeaderEnqueueNext strm off datPayloadLen mnext tlrmkr' sentinel out
RspStreaming _ -> do
let payloadOff = off + frameHeaderLength
Next datPayloadLen mnext <-
fillStreamBodyGetNext conf payloadOff lim (fromJust mtbq) strm
NextTrailersMaker !tlrmkr' <- runTrailersMaker tlrmkr payloadOff datPayloadLen
fillDataHeaderEnqueueNext strm off datPayloadLen mnext tlrmkr' sentinel out
output out@(Output strm _ (OPush ths pid) _ _) off0 lim = do
let !sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + len
output out{outputType=ORspn} off lim
output _ _ _ = undefined
outputOrEnqueueAgain :: Output -> Int -> IO Int
outputOrEnqueueAgain out@(Output strm _ otyp _ _) off = E.handle resetStream $ do
state <- readStreamState strm
if isHalfClosedLocal state then
return off
else case otyp of
OWait wait -> do
forkAndEnqueueWhenReady wait outputQ out{outputType=ORspn} mgr
return off
_ -> case mtbq of
Just tbq -> checkStreaming tbq
_ -> checkStreamWindowSize
where
mtbq = outputStrmQ out
checkStreaming tbq = do
isEmpty <- atomically $ isEmptyTBQueue tbq
if isEmpty then do
forkAndEnqueueWhenReady (waitStreaming tbq) outputQ out mgr
return off
else
checkStreamWindowSize
checkStreamWindowSize = do
sws <- getStreamWindowSize strm
if sws == 0 then do
forkAndEnqueueWhenReady (waitStreamWindowSize strm) outputQ out mgr
return off
else do
cws <- readTVarIO connectionWindow
let !lim = min cws sws
output out off lim
resetStream e = do
closed ctx strm (ResetByMe e)
let !rst = resetFrame InternalError $ streamNumber strm
enqueueControl controlQ $ CFrame rst
return off
{-# INLINE flushN #-}
flushN :: Int -> IO ()
flushN n = bufferIO confWriteBuffer n confSendAll
headerContinue sid ths endOfStream off = do
let !offkv = off + frameHeaderLength
let !bufkv = confWriteBuffer `plusPtr` offkv
!limkv = confBufferSize - offkv
(hs,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
let flag0 = case hs of
[] -> setEndHeader defaultFlags
_ -> defaultFlags
flag = if endOfStream then setEndStream flag0 else flag0
let buf = confWriteBuffer `plusPtr` off
fillFrameHeader FrameHeaders kvlen sid flag buf
continue sid kvlen hs
!bufHeaderPayload = confWriteBuffer `plusPtr` frameHeaderLength
!headerPayloadLim = confBufferSize - frameHeaderLength
continue _ kvlen [] = return kvlen
continue sid kvlen ths = do
flushN $ kvlen + frameHeaderLength
(ths', kvlen') <- hpackEncodeHeaderLoop ctx bufHeaderPayload headerPayloadLim ths
when (ths == ths') $ E.throwIO $ ConnectionError CompressionError "cannot compress the header"
let flag = case ths' of
[] -> setEndHeader defaultFlags
_ -> defaultFlags
fillFrameHeader FrameContinuation kvlen' sid flag confWriteBuffer
continue sid kvlen' ths'
{-# INLINE sendHeadersIfNecessary #-}
sendHeadersIfNecessary off
| off + frameHeaderLength < confBufferSize = return off
| otherwise = do
flushN off
return 0
fillDataHeaderEnqueueNext strm@Stream{streamWindow,streamNumber}
off datPayloadLen Nothing tlrmkr tell _ = do
let !buf = confWriteBuffer `plusPtr` off
!off' = off + frameHeaderLength + datPayloadLen
(mtrailers, flag) <- do
Trailers !trailers <- tlrmkr Nothing
if null trailers then
return (Nothing, setEndStream defaultFlags)
else
return (Just trailers, defaultFlags)
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
off'' <- handleTrailers mtrailers off'
void tell
halfClosedLocal ctx strm Finished
atomically $ modifyTVar' connectionWindow (subtract datPayloadLen)
atomically $ modifyTVar' streamWindow (subtract datPayloadLen)
return off''
where
handleTrailers Nothing off0 = return off0
handleTrailers (Just trailers) off0 = do
(ths,_) <- toHeaderTable trailers
kvlen <- headerContinue streamNumber ths True off0
sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen
fillDataHeaderEnqueueNext Stream{streamWindow,streamNumber}
off datPayloadLen (Just next) tlrmkr _ out = do
let !buf = confWriteBuffer `plusPtr` off
!off' = off + frameHeaderLength + datPayloadLen
flag = defaultFlags
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
atomically $ modifyTVar' connectionWindow (subtract datPayloadLen)
atomically $ modifyTVar' streamWindow (subtract datPayloadLen)
let !out' = out { outputType = ONext next tlrmkr }
enqueueOutput outputQ out'
return off'
pushPromise pid sid ths off = do
let !offsid = off + frameHeaderLength
!bufsid = confWriteBuffer `plusPtr` offsid
poke32 (fromIntegral sid) bufsid 0
let !offkv = offsid + 4
!bufkv = confWriteBuffer `plusPtr` offkv
!limkv = confBufferSize - offkv
(_,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
let !flag = setEndHeader defaultFlags
!buf = confWriteBuffer `plusPtr` off
!len = kvlen + 4
fillFrameHeader FramePushPromise len pid flag buf
return len
{-# INLINE fillFrameHeader #-}
fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf
where
hinfo = FrameHeader len flag sid
runTrailersMaker tlrmkr off siz = do
let datBuf = confWriteBuffer `plusPtr` off
bufferIO datBuf siz $ \bs -> tlrmkr (Just bs)
{-# INLINE ignore #-}
ignore :: E.SomeException -> IO ()
ignore _ = return ()
fillBuilderBodyGetNext :: Config -> Int -> WindowSize -> Builder -> IO Next
fillBuilderBodyGetNext Config{confWriteBuffer,confBufferSize}
off lim bb = do
let datBuf = confWriteBuffer `plusPtr` off
room = min (confBufferSize - off) lim
(len, signal) <- B.runBuilder bb datBuf room
return $ nextForBuilder len signal
fillFileBodyGetNext :: Config -> Int -> WindowSize -> FilePath -> FileOffset -> ByteCount -> Manager -> PositionReadMaker -> IO Next
fillFileBodyGetNext Config{confWriteBuffer,confBufferSize}
off lim path start bytecount mgr prmaker = do
let datBuf = confWriteBuffer `plusPtr` off
room = min (confBufferSize - off) lim
(pread, sentinel) <- prmaker path
refresh <- case sentinel of
Closer closer -> timeoutClose mgr closer
Refresher refresher -> return refresher
len <- pread start (mini room bytecount) datBuf
let len' = fromIntegral len
return $ nextForFile len' pread (start + len) (bytecount - len) refresh
fillStreamBodyGetNext :: Config -> Int -> WindowSize -> TBQueue RspStreaming -> Stream -> IO Next
fillStreamBodyGetNext Config{confWriteBuffer,confBufferSize}
off lim sq strm = do
let datBuf = confWriteBuffer `plusPtr` off
room = min (confBufferSize - off) lim
(leftover, cont, len) <- runStreamBuilder datBuf room sq
return $ nextForStream sq strm leftover cont len
fillBufBuilder :: Leftover -> DynaNext
fillBufBuilder leftover buf0 siz0 lim = do
let payloadBuf = buf0 `plusPtr` frameHeaderLength
room = min (siz0 - frameHeaderLength) lim
case leftover of
LZero -> error "fillBufBuilder: LZero"
LOne writer -> do
(len, signal) <- writer payloadBuf room
getNext len signal
LTwo bs writer
| BS.length bs <= room -> do
buf1 <- copy payloadBuf bs
let len1 = BS.length bs
(len2, signal) <- writer buf1 (room - len1)
getNext (len1 + len2) signal
| otherwise -> do
let (bs1,bs2) = BS.splitAt room bs
void $ copy payloadBuf bs1
getNext room (B.Chunk bs2 writer)
where
getNext l s = return $ nextForBuilder l s
nextForBuilder :: BytesFilled -> B.Next -> Next
nextForBuilder len B.Done
= Next len Nothing
nextForBuilder len (B.More _ writer)
= Next len $ Just (fillBufBuilder (LOne writer))
nextForBuilder len (B.Chunk bs writer)
= Next len $ Just (fillBufBuilder (LTwo bs writer))
runStreamBuilder :: Buffer -> BufferSize -> TBQueue RspStreaming
-> IO (Leftover, Bool, BytesFilled)
runStreamBuilder buf0 room0 sq = loop buf0 room0 0
where
loop !buf !room !total = do
mbuilder <- atomically $ tryReadTBQueue sq
case mbuilder of
Nothing -> return (LZero, True, total)
Just (RSBuilder builder) -> do
(len, signal) <- B.runBuilder builder buf room
let !total' = total + len
case signal of
B.Done -> loop (buf `plusPtr` len) (room - len) total'
B.More _ writer -> return (LOne writer, True, total')
B.Chunk bs writer -> return (LTwo bs writer, True, total')
Just RSFlush -> return (LZero, True, total)
Just RSFinish -> return (LZero, False, total)
fillBufStream :: Leftover -> TBQueue RspStreaming -> Stream -> DynaNext
fillBufStream leftover0 sq strm buf0 siz0 lim0 = do
let payloadBuf = buf0 `plusPtr` frameHeaderLength
room0 = min (siz0 - frameHeaderLength) lim0
case leftover0 of
LZero -> do
(leftover, cont, len) <- runStreamBuilder payloadBuf room0 sq
getNext leftover cont len
LOne writer -> write writer payloadBuf room0 0
LTwo bs writer
| BS.length bs <= room0 -> do
buf1 <- copy payloadBuf bs
let len = BS.length bs
write writer buf1 (room0 - len) len
| otherwise -> do
let (bs1,bs2) = BS.splitAt room0 bs
void $ copy payloadBuf bs1
getNext (LTwo bs2 writer) True room0
where
getNext l b r = return $ nextForStream sq strm l b r
write writer1 buf room sofar = do
(len, signal) <- writer1 buf room
case signal of
B.Done -> do
(leftover, cont, extra) <- runStreamBuilder (buf `plusPtr` len) (room - len) sq
let !total = sofar + len + extra
getNext leftover cont total
B.More _ writer -> do
let !total = sofar + len
getNext (LOne writer) True total
B.Chunk bs writer -> do
let !total = sofar + len
getNext (LTwo bs writer) True total
nextForStream :: TBQueue RspStreaming -> Stream
-> Leftover -> Bool -> BytesFilled
-> Next
nextForStream _ _ _ False len = Next len Nothing
nextForStream sq strm leftOrZero True len =
Next len $ Just (fillBufStream leftOrZero sq strm)
fillBufFile :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
fillBufFile pread start bytes refresh buf siz lim = do
let payloadBuf = buf `plusPtr` frameHeaderLength
room = min (siz - frameHeaderLength) lim
len <- pread start (mini room bytes) payloadBuf
refresh
let len' = fromIntegral len
return $ nextForFile len' pread (start + len) (bytes - len) refresh
nextForFile :: BytesFilled -> PositionRead -> FileOffset -> ByteCount -> IO () -> Next
nextForFile 0 _ _ _ _ = Next 0 Nothing
nextForFile len _ _ 0 _ = Next len Nothing
nextForFile len pread start bytes refresh =
Next len $ Just (fillBufFile pread start bytes refresh)
{-# INLINE mini #-}
mini :: Int -> Int64 -> Int64
mini i n
| fromIntegral i < n = fromIntegral i
| otherwise = n