{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Server.Worker (
worker
, WorkerConf(..)
, fromContext
) where
import Data.IORef
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException(..))
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Arch
import Network.HTTP2.Frame
import Network.HTTP2.Server.Types
data WorkerConf a = WorkerConf {
forall a. WorkerConf a -> IO (Input a)
readInputQ :: IO (Input a)
, forall a. WorkerConf a -> Output a -> IO ()
writeOutputQ :: Output a -> IO ()
, forall a. WorkerConf a -> a -> IO ()
workerCleanup :: a -> IO ()
, forall a. WorkerConf a -> IO Bool
isPushable :: IO Bool
, forall a. WorkerConf a -> StreamId -> a -> IO ()
insertStream :: StreamId -> a -> IO ()
, forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
, forall a. WorkerConf a -> SockAddr
mySockAddr :: SockAddr
, forall a. WorkerConf a -> SockAddr
peerSockAddr :: SockAddr
}
fromContext :: Context -> WorkerConf Stream
fromContext :: Context -> WorkerConf Stream
fromContext ctx :: Context
ctx@Context{TVar StreamId
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef (Maybe SettingsList)
IORef Settings
SockAddr
TQueue Control
TQueue (Output Stream)
DynamicTable
Rate
StreamTable
RoleInfo
Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxConnectionInc :: Context -> IORef StreamId
txConnectionWindow :: Context -> TVar StreamId
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> IORef StreamId
continued :: Context -> IORef (Maybe StreamId)
concurrency :: Context -> IORef StreamId
streamTable :: Context -> StreamTable
peerSettings :: Context -> IORef Settings
mySettings :: Context -> IORef Settings
myPendingAlist :: Context -> IORef (Maybe SettingsList)
myFirstSettings :: Context -> IORef Bool
roleInfo :: Context -> RoleInfo
role :: Context -> Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxConnectionInc :: IORef StreamId
txConnectionWindow :: TVar StreamId
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: IORef StreamId
continued :: IORef (Maybe StreamId)
concurrency :: IORef StreamId
streamTable :: StreamTable
peerSettings :: IORef Settings
mySettings :: IORef Settings
myPendingAlist :: IORef (Maybe SettingsList)
myFirstSettings :: IORef Bool
roleInfo :: RoleInfo
role :: Role
..} = WorkerConf {
readInputQ :: IO (Input Stream)
readInputQ = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue forall a b. (a -> b) -> a -> b
$ ServerInfo -> TQueue (Input Stream)
inputQ forall a b. (a -> b) -> a -> b
$ RoleInfo -> ServerInfo
toServerInfo RoleInfo
roleInfo
, writeOutputQ :: Output Stream -> IO ()
writeOutputQ = TQueue (Output Stream) -> Output Stream -> IO ()
enqueueOutput TQueue (Output Stream)
outputQ
, workerCleanup :: Stream -> IO ()
workerCleanup = \Stream
strm -> do
Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
let frame :: ByteString
frame = ErrorCode -> StreamId -> ByteString
resetFrame ErrorCode
InternalError forall a b. (a -> b) -> a -> b
$ Stream -> StreamId
streamNumber Stream
strm
TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [ByteString] -> Control
CFrames forall a. Maybe a
Nothing [ByteString
frame]
, isPushable :: IO Bool
isPushable = Settings -> Bool
enablePush forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
, insertStream :: StreamId -> Stream -> IO ()
insertStream = StreamTable -> StreamId -> Stream -> IO ()
insert StreamTable
streamTable
, makePushStream :: Stream -> PushPromise -> IO (StreamId, StreamId, Stream)
makePushStream = \Stream
pstrm PushPromise
_ -> do
StreamId
ws <- Settings -> StreamId
initialWindowSize forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
StreamId
sid <- Context -> IO StreamId
getMyNewStreamId Context
ctx
Stream
newstrm <- StreamId -> StreamId -> IO Stream
newPushStream StreamId
sid StreamId
ws
let pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
pid, StreamId
sid, Stream
newstrm)
, mySockAddr :: SockAddr
mySockAddr = SockAddr
mySockAddr
, peerSockAddr :: SockAddr
peerSockAddr = SockAddr
peerSockAddr
}
pushStream :: WorkerConf a
-> a
-> ValueTable
-> [PushPromise]
-> IO OutputType
pushStream :: forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
_ a
_ ValueTable
_ [] = forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
pushStream WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} a
pstrm ValueTable
reqvt [PushPromise]
pps0
| StreamId
len forall a. Eq a => a -> a -> Bool
== StreamId
0 = forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
| Bool
otherwise = do
Bool
pushable <- IO Bool
isPushable
if Bool
pushable then do
TVar StreamId
tvar <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO StreamId
0
StreamId
lim <- forall {a}.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 StreamId
0
if StreamId
lim forall a. Eq a => a -> a -> Bool
== StreamId
0 then
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
else
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (forall {m :: * -> *} {a}. (MonadIO m, Ord a) => a -> TVar a -> m ()
waiter StreamId
lim TVar StreamId
tvar)
else
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
where
len :: StreamId
len = forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
increment :: TVar a -> m ()
increment TVar a
tvar = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (forall a. Num a => a -> a -> a
+a
1)
waiter :: a -> TVar a -> m ()
waiter a
lim TVar a
tvar = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
a
n <- forall a. TVar a -> STM a
readTVar TVar a
tvar
Bool -> STM ()
checkSTM (a
n forall a. Ord a => a -> a -> Bool
>= a
lim)
push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
_ [] StreamId
n = forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
push TVar a
tvar (PushPromise
pp:[PushPromise]
pps) StreamId
n = do
(StreamId
pid, StreamId
sid, a
newstrm) <- a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream a
pstrm PushPromise
pp
StreamId -> a -> IO ()
insertStream StreamId
sid a
newstrm
let scheme :: ByteString
scheme = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenScheme ValueTable
reqvt
auth :: ByteString
auth = forall a. HasCallStack => Maybe a -> a
fromJust (Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenAuthority ValueTable
reqvt
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenHost ValueTable
reqvt)
path :: ByteString
path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
promiseRequest :: [(Token, ByteString)]
promiseRequest = [(Token
tokenMethod, ByteString
H.methodGet)
,(Token
tokenScheme, ByteString
scheme)
,(Token
tokenAuthority, ByteString
auth)
,(Token
tokenPath, ByteString
path)]
ot :: OutputType
ot = [(Token, ByteString)] -> StreamId -> OutputType
OPush [(Token, ByteString)]
promiseRequest StreamId
pid
Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
out :: Output a
out = forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
newstrm OutObj
rsp OutputType
ot forall a. Maybe a
Nothing forall a b. (a -> b) -> a -> b
$ forall {m :: * -> *} {a}. (MonadIO m, Num a) => TVar a -> m ()
increment TVar a
tvar
Output a -> IO ()
writeOutputQ Output a
out
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n forall a. Num a => a -> a -> a
+ StreamId
1)
response :: WorkerConf a -> Manager -> T.Handle -> ThreadContinue -> a -> Request -> Response -> [PushPromise] -> IO ()
response :: forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Handle
th ThreadContinue
tconf a
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = case OutObj -> OutBody
outObjBody OutObj
rsp of
OutBody
OutBodyNone -> do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
OObj forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyBuilder Builder
_ -> do
OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyFile FileSpec
_ -> do
OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
Manager -> IO ()
spawnAction Manager
mgr
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
TBQueue StreamingChunk
tbq <- forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10
Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp (forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
let push :: Builder -> IO ()
push Builder
b = do
Handle -> IO ()
T.pause Handle
th
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
Handle -> IO ()
T.resume Handle
th
flush :: IO ()
flush = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
finished :: IO ()
finished = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished (Manager -> IO ()
decCounter Manager
mgr)
Manager -> IO ()
incCounter Manager
mgr
(Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`E.finally` IO ()
finished
OutBodyStreamingUnmask (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
_ ->
forall a. HasCallStack => [Char] -> a
error [Char]
"response: server does not support OutBodyStreamingUnmask"
where
([(Token, ByteString)]
_,ValueTable
reqvt) = InpObj -> ([(Token, ByteString)], ValueTable)
inpObjHeaders InpObj
req
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: forall a. WorkerConf a -> Manager -> Server -> IO ()
worker wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Server
server = do
StreamInfo a
sinfo <- forall a. IO (StreamInfo a)
newStreamInfo
ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread Manager
mgr forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont
where
go :: StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th = do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
Either SomeException ()
ex <- forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
E.trySyncOrAsync forall a b. (a -> b) -> a -> b
$ do
Handle -> IO ()
T.pause Handle
th
Input a
strm InpObj
req <- IO (Input a)
readInputQ
let req' :: InpObj
req' = InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th
forall a. StreamInfo a -> a -> IO ()
setStreamInfo StreamInfo a
sinfo a
strm
Handle -> IO ()
T.resume Handle
th
Handle -> IO ()
T.tickle Handle
th
let aux :: Aux
aux = Handle -> SockAddr -> SockAddr -> Aux
Aux Handle
th SockAddr
mySockAddr SockAddr
peerSockAddr
Server
server (InpObj -> Request
Request InpObj
req') Aux
aux forall a b. (a -> b) -> a -> b
$ forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response WorkerConf a
wc Manager
mgr Handle
th ThreadContinue
tcont a
strm (InpObj -> Request
Request InpObj
req')
Bool
cont1 <- case Either SomeException ()
ex of
Right () -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Left e :: SomeException
e@(SomeException e
_)
| Just KilledByHttp2ThreadManager{} <- forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
| Just TimeoutThread
T.TimeoutThread <- forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
| Bool
otherwise -> do
StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
forall a. StreamInfo a -> IO ()
clearStreamInfo StreamInfo a
sinfo
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th
pauseRequestBody :: InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th = InpObj
req { inpObjBody :: InpBody
inpObjBody = InpBody
readBody' }
where
readBody :: InpBody
readBody = InpObj -> InpBody
inpObjBody InpObj
req
readBody' :: InpBody
readBody' = do
Handle -> IO ()
T.pause Handle
th
ByteString
bs <- InpBody
readBody
Handle -> IO ()
T.resume Handle
th
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
cleanup :: StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo = do
Maybe a
minp <- forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo StreamInfo a
sinfo
case Maybe a
minp of
Maybe a
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
strm -> a -> IO ()
workerCleanup a
strm
newtype ThreadContinue = ThreadContinue (IORef Bool)
{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Bool
True
{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x
{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = forall a. IORef a -> IO a
readIORef IORef Bool
ref
newtype StreamInfo a = StreamInfo (IORef (Maybe a))
{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo :: forall a. IO (StreamInfo a)
newStreamInfo = forall a. IORef (Maybe a) -> StreamInfo a
StreamInfo forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo :: forall a. StreamInfo a -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe a)
ref) = forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo :: forall a. StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo IORef (Maybe a)
ref) a
inp = forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
inp
{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo :: forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo IORef (Maybe a)
ref) = forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref