{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    worker
  , WorkerConf(..)
  , fromContext
  ) where

import Data.IORef
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException(..))
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Arch
import Network.HTTP2.Frame
import Network.HTTP2.Server.Types

----------------------------------------------------------------

data WorkerConf a = WorkerConf {
    forall a. WorkerConf a -> IO (Input a)
readInputQ     :: IO (Input a)
  , forall a. WorkerConf a -> Output a -> IO ()
writeOutputQ   :: Output a -> IO ()
  , forall a. WorkerConf a -> a -> IO ()
workerCleanup  :: a -> IO ()
  , forall a. WorkerConf a -> IO Bool
isPushable     :: IO Bool
  , forall a. WorkerConf a -> StreamId -> a -> IO ()
insertStream   :: StreamId -> a -> IO ()
  , forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
  , forall a. WorkerConf a -> SockAddr
mySockAddr     :: SockAddr
  , forall a. WorkerConf a -> SockAddr
peerSockAddr   :: SockAddr
  }

fromContext :: Context -> WorkerConf Stream
fromContext :: Context -> WorkerConf Stream
fromContext ctx :: Context
ctx@Context{TVar StreamId
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef (Maybe SettingsList)
IORef Settings
SockAddr
TQueue Control
TQueue (Output Stream)
DynamicTable
Rate
StreamTable
RoleInfo
Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxConnectionInc :: Context -> IORef StreamId
txConnectionWindow :: Context -> TVar StreamId
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> IORef StreamId
continued :: Context -> IORef (Maybe StreamId)
concurrency :: Context -> IORef StreamId
streamTable :: Context -> StreamTable
peerSettings :: Context -> IORef Settings
mySettings :: Context -> IORef Settings
myPendingAlist :: Context -> IORef (Maybe SettingsList)
myFirstSettings :: Context -> IORef Bool
roleInfo :: Context -> RoleInfo
role :: Context -> Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxConnectionInc :: IORef StreamId
txConnectionWindow :: TVar StreamId
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: IORef StreamId
continued :: IORef (Maybe StreamId)
concurrency :: IORef StreamId
streamTable :: StreamTable
peerSettings :: IORef Settings
mySettings :: IORef Settings
myPendingAlist :: IORef (Maybe SettingsList)
myFirstSettings :: IORef Bool
roleInfo :: RoleInfo
role :: Role
..} = WorkerConf {
    readInputQ :: IO (Input Stream)
readInputQ = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue forall a b. (a -> b) -> a -> b
$ ServerInfo -> TQueue (Input Stream)
inputQ forall a b. (a -> b) -> a -> b
$ RoleInfo -> ServerInfo
toServerInfo RoleInfo
roleInfo
  , writeOutputQ :: Output Stream -> IO ()
writeOutputQ = TQueue (Output Stream) -> Output Stream -> IO ()
enqueueOutput TQueue (Output Stream)
outputQ
  , workerCleanup :: Stream -> IO ()
workerCleanup = \Stream
strm -> do
        Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
        let frame :: ByteString
frame = ErrorCode -> StreamId -> ByteString
resetFrame ErrorCode
InternalError forall a b. (a -> b) -> a -> b
$ Stream -> StreamId
streamNumber Stream
strm
        TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [ByteString] -> Control
CFrames forall a. Maybe a
Nothing [ByteString
frame]
  -- Peer SETTINGS_ENABLE_PUSH
  , isPushable :: IO Bool
isPushable = Settings -> Bool
enablePush forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
  , insertStream :: StreamId -> Stream -> IO ()
insertStream = StreamTable -> StreamId -> Stream -> IO ()
insert StreamTable
streamTable
  -- Peer SETTINGS_INITIAL_WINDOW_SIZE
  , makePushStream :: Stream -> PushPromise -> IO (StreamId, StreamId, Stream)
makePushStream = \Stream
pstrm PushPromise
_ -> do
        StreamId
ws <- Settings -> StreamId
initialWindowSize forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
        StreamId
sid <- Context -> IO StreamId
getMyNewStreamId Context
ctx
        Stream
newstrm <- StreamId -> StreamId -> IO Stream
newPushStream StreamId
sid StreamId
ws
        let pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
        forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
pid, StreamId
sid, Stream
newstrm)
  , mySockAddr :: SockAddr
mySockAddr = SockAddr
mySockAddr
  , peerSockAddr :: SockAddr
peerSockAddr = SockAddr
peerSockAddr
  }

----------------------------------------------------------------

pushStream :: WorkerConf a
           -> a -- parent stream
           -> ValueTable -- request
           -> [PushPromise]
           -> IO OutputType
pushStream :: forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
_ a
_ ValueTable
_ [] = forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
pushStream WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} a
pstrm ValueTable
reqvt [PushPromise]
pps0
  | StreamId
len forall a. Eq a => a -> a -> Bool
== StreamId
0 = forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
  | Bool
otherwise = do
        Bool
pushable <- IO Bool
isPushable
        if Bool
pushable then do
            TVar StreamId
tvar <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO StreamId
0
            StreamId
lim <- forall {a}.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 StreamId
0
            if StreamId
lim forall a. Eq a => a -> a -> Bool
== StreamId
0 then
              forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
             else
              forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (forall {m :: * -> *} {a}. (MonadIO m, Ord a) => a -> TVar a -> m ()
waiter StreamId
lim TVar StreamId
tvar)
          else
            forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
  where
    len :: StreamId
len = forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
    increment :: TVar a -> m ()
increment TVar a
tvar = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (forall a. Num a => a -> a -> a
+a
1)
    waiter :: a -> TVar a -> m ()
waiter a
lim TVar a
tvar = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        a
n <- forall a. TVar a -> STM a
readTVar TVar a
tvar
        Bool -> STM ()
checkSTM (a
n forall a. Ord a => a -> a -> Bool
>= a
lim)
    push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
_ [] StreamId
n = forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
    push TVar a
tvar (PushPromise
pp:[PushPromise]
pps) StreamId
n = do
        (StreamId
pid, StreamId
sid, a
newstrm) <- a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream a
pstrm PushPromise
pp
        StreamId -> a -> IO ()
insertStream StreamId
sid a
newstrm
        let scheme :: ByteString
scheme = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenScheme ValueTable
reqvt
            -- fixme: this value can be Nothing
            auth :: ByteString
auth   = forall a. HasCallStack => Maybe a -> a
fromJust (Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenAuthority ValueTable
reqvt
                           forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenHost      ValueTable
reqvt)
            path :: ByteString
path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
            promiseRequest :: [(Token, ByteString)]
promiseRequest = [(Token
tokenMethod, ByteString
H.methodGet)
                             ,(Token
tokenScheme, ByteString
scheme)
                             ,(Token
tokenAuthority, ByteString
auth)
                             ,(Token
tokenPath, ByteString
path)]
            ot :: OutputType
ot = [(Token, ByteString)] -> StreamId -> OutputType
OPush [(Token, ByteString)]
promiseRequest StreamId
pid
            Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
            out :: Output a
out = forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
newstrm OutObj
rsp OutputType
ot forall a. Maybe a
Nothing forall a b. (a -> b) -> a -> b
$ forall {m :: * -> *} {a}. (MonadIO m, Num a) => TVar a -> m ()
increment TVar a
tvar
        Output a -> IO ()
writeOutputQ Output a
out
        TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n forall a. Num a => a -> a -> a
+ StreamId
1)

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
response :: WorkerConf a -> Manager -> T.Handle -> ThreadContinue -> a -> Request -> Response -> [PushPromise] -> IO ()
response :: forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Handle
th ThreadContinue
tconf a
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = case OutObj -> OutBody
outObjBody OutObj
rsp of
  OutBody
OutBodyNone -> do
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
OObj forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
  OutBodyBuilder Builder
_ -> do
      OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
  OutBodyFile FileSpec
_ -> do
      OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
  OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
      OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
      -- We must not exit this server application.
      -- If the application exits, streaming would be also closed.
      -- So, this work occupies this thread.
      --
      -- We need to increase the number of workers.
      Manager -> IO ()
spawnAction Manager
mgr
      -- After this work, this thread stops to decease
      -- the number of workers.
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
      -- Since streaming body is loop, we cannot control it.
      -- So, let's serialize 'Builder' with a designated queue.
      TBQueue StreamingChunk
tbq <- forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
      Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp (forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
      let push :: Builder -> IO ()
push Builder
b = do
            Handle -> IO ()
T.pause Handle
th
            forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
            Handle -> IO ()
T.resume Handle
th
          flush :: IO ()
flush  = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
          finished :: IO ()
finished = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished (Manager -> IO ()
decCounter Manager
mgr)
      Manager -> IO ()
incCounter Manager
mgr
      (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`E.finally` IO ()
finished
  OutBodyStreamingUnmask (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
_ ->
    forall a. HasCallStack => [Char] -> a
error [Char]
"response: server does not support OutBodyStreamingUnmask"
  where
    ([(Token, ByteString)]
_,ValueTable
reqvt) = InpObj -> ([(Token, ByteString)], ValueTable)
inpObjHeaders InpObj
req

-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: forall a. WorkerConf a -> Manager -> Server -> IO ()
worker wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Server
server = do
    StreamInfo a
sinfo <- forall a. IO (StreamInfo a)
newStreamInfo
    ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
    forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread Manager
mgr forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont
  where
    go :: StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th = do
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
        Either SomeException ()
ex <- forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
E.trySyncOrAsync forall a b. (a -> b) -> a -> b
$ do
            Handle -> IO ()
T.pause Handle
th
            Input a
strm InpObj
req <- IO (Input a)
readInputQ
            let req' :: InpObj
req' = InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th
            forall a. StreamInfo a -> a -> IO ()
setStreamInfo StreamInfo a
sinfo a
strm
            Handle -> IO ()
T.resume Handle
th
            Handle -> IO ()
T.tickle Handle
th
            let aux :: Aux
aux = Handle -> SockAddr -> SockAddr -> Aux
Aux Handle
th SockAddr
mySockAddr SockAddr
peerSockAddr
            Server
server (InpObj -> Request
Request InpObj
req') Aux
aux forall a b. (a -> b) -> a -> b
$ forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response WorkerConf a
wc Manager
mgr Handle
th ThreadContinue
tcont a
strm (InpObj -> Request
Request InpObj
req')
        Bool
cont1 <- case Either SomeException ()
ex of
            Right () -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Left e :: SomeException
e@(SomeException e
_)
              -- killed by the local worker manager
              | Just KilledByHttp2ThreadManager{} <- forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
              -- killed by the local timeout manager
              | Just TimeoutThread
T.TimeoutThread <- forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
                  StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                  forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
              | Bool
otherwise -> do
                  StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                  forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
        forall a. StreamInfo a -> IO ()
clearStreamInfo StreamInfo a
sinfo
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th
    pauseRequestBody :: InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th = InpObj
req { inpObjBody :: InpBody
inpObjBody = InpBody
readBody' }
      where
        readBody :: InpBody
readBody = InpObj -> InpBody
inpObjBody InpObj
req
        readBody' :: InpBody
readBody' = do
            Handle -> IO ()
T.pause Handle
th
            ByteString
bs <- InpBody
readBody
            Handle -> IO ()
T.resume Handle
th
            forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
    cleanup :: StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo = do
        Maybe a
minp <- forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo StreamInfo a
sinfo
        case Maybe a
minp of
            Maybe a
Nothing   -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just a
strm -> a -> IO ()
workerCleanup a
strm

----------------------------------------------------------------

--   A reference is shared by a responder and its worker.
--   The reference refers a value of this type as a return value.
--   If 'True', the worker continue to serve requests.
--   Otherwise, the worker get finished.
newtype ThreadContinue = ThreadContinue (IORef Bool)

{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Bool
True

{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x

{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = forall a. IORef a -> IO a
readIORef IORef Bool
ref

----------------------------------------------------------------

-- | The type for cleaning up.
newtype StreamInfo a = StreamInfo (IORef (Maybe a))

{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo :: forall a. IO (StreamInfo a)
newStreamInfo = forall a. IORef (Maybe a) -> StreamInfo a
StreamInfo forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing

{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo :: forall a. StreamInfo a -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe a)
ref) = forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing

{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo :: forall a. StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo IORef (Maybe a)
ref) a
inp = forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
inp

{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo :: forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo IORef (Maybe a)
ref) = forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref