{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.QUIC.Stream.Reass (
    takeRecvStreamQwithSize,
    putRxStreamData,
    FlowCntl (..),
    tryReassemble,
) where

import qualified Data.ByteString as BS
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq

import Network.QUIC.Imports

-- import Network.QUIC.Logger
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types

----------------------------------------------------------------

getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
..} = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ

setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True

readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe StreamData)
readPendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef (Maybe StreamData) -> IO (Maybe StreamData)
forall a. IORef a -> IO a
readIORef (IORef (Maybe StreamData) -> IO (Maybe StreamData))
-> IORef (Maybe StreamData) -> IO (Maybe StreamData)
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ

writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> StreamData -> IO ()
writePendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} StreamData
bs = IORef (Maybe StreamData) -> Maybe StreamData -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) (Maybe StreamData -> IO ()) -> Maybe StreamData -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamData -> Maybe StreamData
forall a. a -> Maybe a
Just StreamData
bs

clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef (Maybe StreamData) -> Maybe StreamData -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) Maybe StreamData
forall a. Maybe a
Nothing

----------------------------------------------------------------

takeRecvStreamQwithSize :: Stream -> Int -> IO ByteString
takeRecvStreamQwithSize :: Stream -> Offset -> IO StreamData
takeRecvStreamQwithSize Stream
strm Offset
siz0 = do
    Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
    if Bool
eos
        then StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
        else do
            Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
readPendingData Stream
strm
            case Maybe StreamData
mb of
                Maybe StreamData
Nothing -> do
                    StreamData
b0 <- Stream -> IO StreamData
takeRecvStreamQ Stream
strm
                    if StreamData
b0 StreamData -> StreamData -> Bool
forall a. Eq a => a -> a -> Bool
== StreamData
""
                        then do
                            Stream -> IO ()
setEndOfStream Stream
strm
                            StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
                        else do
                            let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
                            case Offset
len Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz0 of
                                Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:)
                                Ordering
EQ -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b0
                                Ordering
GT -> do
                                    let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz0 StreamData
b0
                                    Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                                    StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b1
                Just StreamData
b0 -> do
                    Stream -> IO ()
clearPendingData Stream
strm
                    let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
                    Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:)
  where
    tryRead :: Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead Offset
siz [StreamData] -> [StreamData]
build = do
        Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
tryTakeRecvStreamQ Stream
strm
        case Maybe StreamData
mb of
            Maybe StreamData
Nothing -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
            Just StreamData
b -> do
                if StreamData
b StreamData -> StreamData -> Bool
forall a. Eq a => a -> a -> Bool
== StreamData
""
                    then do
                        Stream -> IO ()
setEndOfStream Stream
strm
                        StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
                    else do
                        let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b
                        case Offset
len Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz of
                            Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) ([StreamData] -> [StreamData]
build ([StreamData] -> [StreamData])
-> ([StreamData] -> [StreamData]) -> [StreamData] -> [StreamData]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamData
b StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:))
                            Ordering
EQ -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b]
                            Ordering
GT -> do
                                let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz StreamData
b
                                Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                                StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b1]

----------------------------------------------------------------
----------------------------------------------------------------

data FlowCntl = OverLimit | Duplicated | Reassembled

putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData StreamData
_ Offset
off Offset
len Bool
_) = do
    Offset
lim <- Stream -> IO Offset
getRxMaxStreamData Stream
s
    if Offset
len Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
off Offset -> Offset -> Bool
forall a. Ord a => a -> a -> Bool
> Offset
lim
        then FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
OverLimit
        else do
            Bool
dup <- Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx StreamData -> IO ()
put IO ()
putFin
            if Bool
dup
                then FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Duplicated
                else FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Reassembled
  where
    put :: StreamData -> IO ()
put StreamData
"" = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    put StreamData
d = do
        Stream -> Offset -> IO ()
addRxStreamData Stream
s (Offset -> IO ()) -> Offset -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamData -> Offset
BS.length StreamData
d
        Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
d
    putFin :: IO ()
putFin = Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
""

-- fin of StreamState off fin means see-fin-already.
-- return value indicates duplication
tryReassemble
    :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{} (RxStreamData StreamData
"" Offset
_ Offset
_ Bool
False) StreamData -> IO ()
_ IO ()
_ = Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
"" Offset
off Offset
_ Bool
True) StreamData -> IO ()
_ IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0{streamFin = True}
    if Bool
fin0
        then do
            -- stdoutLogger "Illegal Fin" -- fixme
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
            Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Ordering
EQ -> do
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                IO ()
putFin
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Ordering
GT -> do
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
False) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
_) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
        Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Ordering
EQ -> do
            StreamData -> IO ()
put StreamData
dat
            StreamState -> Offset -> IO ()
loop StreamState
si0 (Offset
off0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
len)
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Ordering
GT -> do
            IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
  where
    loop :: StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff = do
        Maybe (Seq RxStreamData)
mrxs <- IORef (Skew RxStreamData)
-> (Skew RxStreamData
    -> (Skew RxStreamData, Maybe (Seq RxStreamData)))
-> IO (Maybe (Seq RxStreamData))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (Offset
-> Skew RxStreamData
-> (Skew RxStreamData, Maybe (Seq RxStreamData))
forall a. Frag a => Offset -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf Offset
xff)
        case Maybe (Seq RxStreamData)
mrxs of
            Maybe (Seq RxStreamData)
Nothing -> IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0{streamOffset = xff}
            Just Seq RxStreamData
rxs -> do
                (RxStreamData -> IO ()) -> Seq RxStreamData -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (StreamData -> IO ()
put (StreamData -> IO ())
-> (RxStreamData -> StreamData) -> RxStreamData -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> StreamData
rxstrmData) Seq RxStreamData
rxs
                let xff1 :: Offset
xff1 = Seq RxStreamData -> Offset
forall a. Frag a => a -> Offset
nextOff Seq RxStreamData
rxs
                if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs
                    then do
                        IO ()
putFin
                    else do
                        StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff1
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
True) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0{streamFin = True}
    if Bool
fin0
        then do
            -- stdoutLogger "Illegal Fin" -- fixme
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
            Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Ordering
EQ -> do
                let off1 :: Offset
off1 = Offset
off0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
len
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1{streamOffset = off1}
                StreamData -> IO ()
put StreamData
dat
                IO ()
putFin
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Ordering
GT -> do
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case Seq RxStreamData -> ViewR RxStreamData
forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
    ViewR RxStreamData
Seq.EmptyR -> Bool
False
    Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x