{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Server.Worker (
worker
) where
import Control.Concurrent.STM
import Control.Exception (SomeException(..), AsyncException(..))
import qualified Control.Exception as E
import Data.IORef
import qualified Network.HTTP.Types as H
import qualified System.TimeManager as T
import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2
import Network.HTTP2.Priority
import Network.HTTP2.Server.API
import Network.HTTP2.Server.Context
import Network.HTTP2.Server.EncodeFrame
import Network.HTTP2.Server.Manager
import Network.HTTP2.Server.Queue
import Network.HTTP2.Server.Stream
import Network.HTTP2.Server.Types
pushStream :: Context
-> Stream
-> ValueTable
-> [PushPromise]
-> IO OutputType
pushStream _ _ _ [] = return ORspn
pushStream ctx@Context{..} pstrm reqvt pps0
| len == 0 = return ORspn
| otherwise = do
pushable <- enablePush <$> readIORef http2settings
if pushable then do
tvar <- newTVarIO 0
lim <- push tvar pps0 0
if lim == 0 then
return ORspn
else
return $ OWait (waiter lim tvar)
else
return ORspn
where
!pid = streamNumber pstrm
!len = length pps0
increment tvar = atomically $ modifyTVar' tvar (+1)
waiter lim tvar = atomically $ do
n <- readTVar tvar
check (n >= lim)
push _ [] !n = return (n :: Int)
push tvar (pp:pps) !n = do
ws <- initialWindowSize <$> readIORef http2settings
let !w = promiseWeight pp
!pri = defaultPriority { weight = w }
!pre = toPrecedence pri
newstrm <- newPushStream ctx ws pre
let !sid = streamNumber newstrm
insert streamTable sid newstrm
let !scheme = fromJust $ getHeaderValue tokenScheme reqvt
!auth = fromJust (getHeaderValue tokenHost reqvt
<|> getHeaderValue tokenAuthority reqvt)
!path = promiseRequestPath pp
!promiseRequest = [(tokenMethod, H.methodGet)
,(tokenScheme, scheme)
,(tokenAuthority, auth)
,(tokenPath, path)]
!ot = OPush promiseRequest pid
!rsp = promiseResponse pp
!out = Output newstrm rsp ot Nothing $ increment tvar
enqueueOutput outputQ out
push tvar pps (n + 1)
response :: Context -> Manager -> T.Handle -> ThreadContinue -> Stream -> Request -> Response -> [PushPromise] -> IO ()
response ctx@Context{..} mgr th tconf strm req rsp pps = case responseBody rsp of
RspNoBody -> do
setThreadContinue tconf True
enqueueOutput outputQ $ Output strm rsp ORspn Nothing (return ())
RspBuilder _ -> do
otyp <- pushStream ctx strm reqvt pps
setThreadContinue tconf True
enqueueOutput outputQ $ Output strm rsp otyp Nothing (return ())
RspFile _ -> do
otyp <- pushStream ctx strm reqvt pps
setThreadContinue tconf True
enqueueOutput outputQ $ Output strm rsp otyp Nothing (return ())
RspStreaming strmbdy -> do
otyp <- pushStream ctx strm reqvt pps
spawnAction mgr
setThreadContinue tconf False
tbq <- newTBQueueIO 10
enqueueOutput outputQ $ Output strm rsp otyp (Just tbq) (return ())
let push b = do
T.pause th
atomically $ writeTBQueue tbq (RSBuilder b)
T.resume th
flush = atomically $ writeTBQueue tbq RSFlush
strmbdy push flush
atomically $ writeTBQueue tbq RSFinish
deleteMyId mgr
where
(_,reqvt) = requestHeaders req
worker :: Context -> Manager -> Server -> Action
worker ctx@Context{inputQ,controlQ} mgr server = do
sinfo <- newStreamInfo
tcont <- newThreadContinue
timeoutKillThread mgr $ go sinfo tcont
where
go sinfo tcont th = do
setThreadContinue tcont True
ex <- E.try $ do
T.pause th
Input strm req <- atomically $ readTQueue inputQ
let req' = pauseRequestBody req th
setStreamInfo sinfo strm
T.resume th
T.tickle th
let aux = Aux th
server req aux $ response ctx mgr th tcont strm req'
cont1 <- case ex of
Right () -> return True
Left e@(SomeException _)
| Just ThreadKilled <- E.fromException e -> return False
| Just T.TimeoutThread <- E.fromException e -> do
cleanup sinfo
return True
| otherwise -> do
cleanup sinfo
return True
cont2 <- getThreadContinue tcont
clearStreamInfo sinfo
when (cont1 && cont2) $ go sinfo tcont th
pauseRequestBody req th = req { requestBody = readBody' }
where
!readBody = requestBody req
!readBody' = do
T.pause th
bs <- readBody
T.resume th
return bs
cleanup sinfo = do
minp <- getStreamInfo sinfo
case minp of
Nothing -> return ()
Just strm -> do
closed ctx strm Killed
let !frame = resetFrame InternalError (streamNumber strm)
enqueueControl controlQ $ CFrame frame
newtype ThreadContinue = ThreadContinue (IORef Bool)
{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue = ThreadContinue <$> newIORef True
{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue ref) x = writeIORef ref x
{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue ref) = readIORef ref
newtype StreamInfo = StreamInfo (IORef (Maybe Stream))
{-# INLINE newStreamInfo #-}
newStreamInfo :: IO StreamInfo
newStreamInfo = StreamInfo <$> newIORef Nothing
{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo -> IO ()
clearStreamInfo (StreamInfo ref) = writeIORef ref Nothing
{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo -> Stream -> IO ()
setStreamInfo (StreamInfo ref) inp = writeIORef ref $ Just inp
{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo -> IO (Maybe Stream)
getStreamInfo (StreamInfo ref) = readIORef ref