{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.Bounded.Internal
( InChan(..), OutChan(..), ChanEnd(..), StreamSegment, Cell(..), Stream(..)
, writerCheckin, unblockWriters, tryWriterCheckin, WriterCheckpoint(..)
, NextSegment(..), StreamHead(..)
, newChanStarting, writeChan, readChan, readChanOnException
, tryWriteChan, tryReadChan
, dupChan
, estimatedLength
)
where
import Control.Concurrent.MVar
import Data.IORef
import Control.Exception
import Control.Monad.Primitive(RealWorld)
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Maybe(fromMaybe,isJust)
import Data.Typeable(Typeable)
import GHC.Exts(inline)
import Utilities(nextHighestPowerOfTwo)
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT
import Prelude
data InChan a = InChan (IO Int)
!(Ticket (Cell a))
!(ChanEnd a)
deriving Typeable
newtype OutChan a = OutChan (ChanEnd a)
deriving Typeable
instance Eq (InChan a) where
(InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headA)) == :: InChan a -> InChan a -> Bool
== (InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headB))
= IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
instance Eq (OutChan a) where
(OutChan (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headA)) == :: OutChan a -> OutChan a -> Bool
== (OutChan (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headB))
= IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
data ChanEnd a =
ChanEnd !Int
!Int
!(SegSource a)
!AtomicCounter
!(IORef (StreamHead a))
deriving Typeable
data StreamHead a = StreamHead !Int !(Stream a)
type StreamSegment a = P.MutableArray RealWorld (Cell a)
data Cell a = Empty | Written a | Blocking !(MVar a)
data Stream a =
Stream !(StreamSegment a)
!(IORef (Maybe (NextSegment a)))
data NextSegment a = NextByWriter (Stream a)
!WriterCheckpoint
| NextByReader (Stream a)
getNextRef :: NextSegment a -> IORef (Maybe (NextSegment a))
getNextRef :: NextSegment a -> IORef (Maybe (NextSegment a))
getNextRef NextSegment a
x = (\(Stream StreamSegment a
_ IORef (Maybe (NextSegment a))
nextSegRef)-> IORef (Maybe (NextSegment a))
nextSegRef) (Stream a -> IORef (Maybe (NextSegment a)))
-> Stream a -> IORef (Maybe (NextSegment a))
forall a b. (a -> b) -> a -> b
$ NextSegment a -> Stream a
forall a. NextSegment a -> Stream a
getStr NextSegment a
x
getStr :: NextSegment a -> Stream a
getStr :: NextSegment a -> Stream a
getStr (NextByReader Stream a
str) = Stream a
str
getStr (NextByWriter Stream a
str WriterCheckpoint
_) = Stream a
str
asReader, asWriter :: Bool
asReader :: Bool
asReader = Bool
True
asWriter :: Bool
asWriter = Bool
False
newChanStarting :: Int -> Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset !Int
sizeDirty = do
let !size :: Int
size = Int -> Int
nextHighestPowerOfTwo Int
sizeDirty
!logBounds :: Int
logBounds = Float -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Float -> Int) -> Float -> Int
forall a b. (a -> b) -> a -> b
$ Float -> Float -> Float
forall a. Floating a => a -> a -> a
logBase (Float
2::Float) (Float -> Float) -> Float -> Float
forall a b. (a -> b) -> a -> b
$ Int -> Float
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
size
!boundsMn1 :: Int
boundsMn1 = Int
size Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
SegSource a
segSource <- Int -> IO (SegSource a)
forall a. Int -> IO (SegSource a)
newSegmentSource Int
size
StreamSegment a
firstSeg <- SegSource a
segSource
Ticket (Cell a)
savedEmptyTkt <- StreamSegment a -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment a
firstSeg Int
0
Stream a
stream <- StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
forall a.
StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
Stream StreamSegment a
firstSeg (IORef (Maybe (NextSegment a)) -> Stream a)
-> IO (IORef (Maybe (NextSegment a))) -> IO (Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (NextSegment a) -> IO (IORef (Maybe (NextSegment a)))
forall a. a -> IO (IORef a)
newIORef Maybe (NextSegment a)
forall a. Maybe a
Nothing
let end :: IO (ChanEnd a)
end = Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
forall a.
Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource
(AtomicCounter -> IORef (StreamHead a) -> ChanEnd a)
-> IO AtomicCounter -> IO (IORef (StreamHead a) -> ChanEnd a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
IO (IORef (StreamHead a) -> ChanEnd a)
-> IO (IORef (StreamHead a)) -> IO (ChanEnd a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
startingCellOffset Stream a
stream)
endR :: ChanEnd a
endR@(ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
counterR IORef (StreamHead a)
_) <- IO (ChanEnd a)
end
ChanEnd a
endW <- IO (ChanEnd a)
end
Bool -> IO (InChan a, OutChan a) -> IO (InChan a, OutChan a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& (Int
boundsMn1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Int
logBounds) (IO (InChan a, OutChan a) -> IO (InChan a, OutChan a))
-> IO (InChan a, OutChan a) -> IO (InChan a, OutChan a)
forall a b. (a -> b) -> a -> b
$
(InChan a, OutChan a) -> IO (InChan a, OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return ( IO Int -> Ticket (Cell a) -> ChanEnd a -> InChan a
forall a. IO Int -> Ticket (Cell a) -> ChanEnd a -> InChan a
InChan (AtomicCounter -> IO Int
readCounter AtomicCounter
counterR) Ticket (Cell a)
savedEmptyTkt ChanEnd a
endW
, ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan ChanEnd a
endR )
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource AtomicCounter
counter IORef (StreamHead a)
streamHead)) = do
StreamHead a
hLoc <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
IO ()
loadLoadBarrier
Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
AtomicCounter
counter' <- Int -> IO AtomicCounter
newCounter Int
wCount
IORef (StreamHead a)
streamHead' <- StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef StreamHead a
hLoc
OutChan a -> IO (OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutChan a -> IO (OutChan a)) -> OutChan a -> IO (OutChan a)
forall a b. (a -> b) -> a -> b
$ ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan (ChanEnd a -> OutChan a) -> ChanEnd a -> OutChan a
forall a b. (a -> b) -> a -> b
$ Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
forall a.
Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource AtomicCounter
counter' IORef (StreamHead a)
streamHead'
writeChan :: InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan InChan a
c = \a
a-> Bool -> InChan a -> a -> IO ()
forall a. Bool -> InChan a -> a -> IO ()
writeChanWithBlocking Bool
True InChan a
c a
a
writeChanWithBlocking :: Bool -> InChan a -> a -> IO ()
{-# INLINE writeChanWithBlocking #-}
writeChanWithBlocking :: Bool -> InChan a -> a -> IO ()
writeChanWithBlocking Bool
canBlock (InChan IO Int
_ Ticket (Cell a)
savedEmptyTkt ChanEnd a
ce) a
a = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Int
segIx, NextSegment a
nextSeg, IO ()
updateStreamHeadIfNecessary) <- Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
forall a. Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
moveToNextCell Bool
asWriter ChanEnd a
ce
let (StreamSegment a
seg, Maybe WriterCheckpoint
maybeCheckpt) = case NextSegment a
nextSeg of
NextByWriter (Stream StreamSegment a
s IORef (Maybe (NextSegment a))
_) WriterCheckpoint
checkpt -> (StreamSegment a
s, WriterCheckpoint -> Maybe WriterCheckpoint
forall a. a -> Maybe a
Just WriterCheckpoint
checkpt)
NextByReader (Stream StreamSegment a
s IORef (Maybe (NextSegment a))
_) -> (StreamSegment a
s, Maybe WriterCheckpoint
forall a. Maybe a
Nothing)
(Bool
success,Ticket (Cell a)
nonEmptyTkt) <- StreamSegment a
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment a
seg Int
segIx Ticket (Cell a)
savedEmptyTkt (a -> Cell a
forall a. a -> Cell a
Written a
a)
if Bool
success
then IO ()
-> (WriterCheckpoint -> IO ()) -> Maybe WriterCheckpoint -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO ()
updateStreamHeadIfNecessary
(\WriterCheckpoint
checkpt-> do
Bool
segUnlocked <- if Bool
canBlock
then Bool
True Bool -> IO () -> IO Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ WriterCheckpoint -> IO ()
writerCheckin WriterCheckpoint
checkpt
else WriterCheckpoint -> IO Bool
tryWriterCheckin WriterCheckpoint
checkpt
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
segUnlocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO ()
updateStreamHeadIfNecessary )
Maybe WriterCheckpoint
maybeCheckpt
else case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
nonEmptyTkt of
Blocking MVar a
v -> do MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
v a
a
IO ()
updateStreamHeadIfNecessary
Cell a
Empty -> [Char] -> IO ()
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Stored Empty Ticket went stale!"
Written a
_ -> [Char] -> IO ()
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Nearly Impossible! Expected Blocking"
tryWriteChan :: InChan a -> a -> IO Bool
{-# INLINE tryWriteChan #-}
tryWriteChan :: InChan a -> a -> IO Bool
tryWriteChan c :: InChan a
c@(InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
_ Int
boundsMn1 SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
_)) = \a
a-> do
Int
len <- InChan a -> IO Int
forall a. InChan a -> IO Int
estimatedLength InChan a
c
if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
boundsMn1
then Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else Bool -> InChan a -> a -> IO ()
forall a. Bool -> InChan a -> a -> IO ()
writeChanWithBlocking Bool
False InChan a
c a
a IO () -> IO Bool -> IO Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
estimatedLength :: InChan a -> IO Int
{-# INLINE estimatedLength #-}
estimatedLength :: InChan a -> IO Int
estimatedLength (InChan IO Int
readCounterReader Ticket (Cell a)
_ (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
counter IORef (StreamHead a)
_)) = do
Int
ixR <- IO Int
readCounterReader
Int
ixW <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int
ixW Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ixR
readSegIxUnmasked :: (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
{-# INLINE readSegIxUnmasked #-}
readSegIxUnmasked :: (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked IO a -> IO a
h = \(StreamSegment a
seg,Int
segIx)-> do
Ticket (Cell a)
cellTkt <- StreamSegment a -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment a
seg Int
segIx
case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
cellTkt of
Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Cell a
Empty -> do
MVar a
v <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
(Bool
success,Ticket (Cell a)
elseWrittenCell) <- StreamSegment a
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment a
seg Int
segIx Ticket (Cell a)
cellTkt (MVar a -> Cell a
forall a. MVar a -> Cell a
Blocking MVar a
v)
if Bool
success
then MVar a -> IO a
readBlocking MVar a
v
else case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
elseWrittenCell of
Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Blocking MVar a
v2 -> MVar a -> IO a
readBlocking MVar a
v2
Cell a
_ -> [Char] -> IO a
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Impossible! Expecting Written or Blocking"
Blocking MVar a
v -> MVar a -> IO a
readBlocking MVar a
v
where readBlocking :: MVar a -> IO a
readBlocking MVar a
v = (IO a -> IO a) -> IO a -> IO a
forall a. a -> a
inline IO a -> IO a
h (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall a. MVar a -> IO a
readMVar MVar a
v
startReadChan :: OutChan a -> IO (StreamSegment a, Int)
{-# INLINE startReadChan #-}
startReadChan :: OutChan a -> IO (StreamSegment a, Int)
startReadChan (OutChan ce :: ChanEnd a
ce@(ChanEnd Int
_ Int
_ SegSource a
segSource AtomicCounter
_ IORef (StreamHead a)
_)) = do
(Int
segIx, NextSegment a
nextSeg, IO ()
updateStreamHeadIfNecessary) <- Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
forall a. Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
moveToNextCell Bool
asReader ChanEnd a
ce
let (StreamSegment a
seg,IORef (Maybe (NextSegment a))
next) = case NextSegment a
nextSeg of
NextByReader (Stream StreamSegment a
s IORef (Maybe (NextSegment a))
n) -> (StreamSegment a
s,IORef (Maybe (NextSegment a))
n)
NextSegment a
_ -> [Char] -> (StreamSegment a, IORef (Maybe (NextSegment a)))
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"moveToNextCell returned a non-reader-installed next segment to readSegIxUnmasked"
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (NextSegment a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (NextSegment a) -> IO ()) -> IO (NextSegment a) -> IO ()
forall a b. (a -> b) -> a -> b
$
Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
forall a.
Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
waitingAdvanceStream Bool
asReader IORef (Maybe (NextSegment a))
next SegSource a
segSource Int
0
IO ()
updateStreamHeadIfNecessary
(StreamSegment a, Int) -> IO (StreamSegment a, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamSegment a
seg,Int
segIx)
tryReadChan :: OutChan a -> IO (UT.Element a, IO a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a, IO a)
tryReadChan OutChan a
oc = do
(StreamSegment a
seg,Int
segIx) <- OutChan a -> IO (StreamSegment a, Int)
forall a. OutChan a -> IO (StreamSegment a, Int)
startReadChan OutChan a
oc
(Element a, IO a) -> IO (Element a, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (
IO (Maybe a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Maybe a) -> Element a) -> IO (Maybe a) -> Element a
forall a b. (a -> b) -> a -> b
$ do
Cell a
cell <- MutableArray (PrimState IO) (Cell a) -> Int -> IO (Cell a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a -> Int -> m a
P.readArray StreamSegment a
MutableArray (PrimState IO) (Cell a)
seg Int
segIx
case Cell a
cell of
Written a
a -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
Cell a
Empty -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
Blocking MVar a
v -> MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
v
, (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
forall a. (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id (StreamSegment a
seg,Int
segIx)
)
readChan :: OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: OutChan a -> IO a
readChan = \OutChan a
oc-> OutChan a -> IO (StreamSegment a, Int)
forall a. OutChan a -> IO (StreamSegment a, Int)
startReadChan OutChan a
oc IO (StreamSegment a, Int)
-> ((StreamSegment a, Int) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
forall a. (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
{-# INLINE readChanOnException #-}
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
readChanOnException OutChan a
oc IO a -> IO ()
h = IO a -> IO a
forall a. IO a -> IO a
mask_ (
OutChan a -> IO (StreamSegment a, Int)
forall a. OutChan a -> IO (StreamSegment a, Int)
startReadChan OutChan a
oc IO (StreamSegment a, Int)
-> ((StreamSegment a, Int) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
(IO a -> IO a) -> (StreamSegment a, Int) -> IO a
forall a. (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked (\IO a
io-> IO a
io IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` (IO a -> IO ()
h IO a
io))
)
moveToNextCell :: Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
{-# INLINE moveToNextCell #-}
moveToNextCell :: Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
moveToNextCell Bool
isReader (ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource AtomicCounter
counter IORef (StreamHead a)
streamHead) = do
(StreamHead Int
offset0 Stream a
str0) <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
Int
ix <- Int -> AtomicCounter -> IO Int
incrCounter Int
1 AtomicCounter
counter
let !relIx :: Int
relIx = Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0
!segsAway :: Int
segsAway = Int
relIx Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftR` Int
logBounds
!segIx :: Int
segIx = Int
relIx Int -> Int -> Int
forall a. Bits a => a -> a -> a
.&. Int
boundsMn1
~Int
nEW_SEGMENT_WAIT = (Int
boundsMn1 Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
12) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
25
go :: t -> NextSegment a -> IO (NextSegment a)
go t
0 NextSegment a
nextSeg = NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextSeg
go !t
n NextSegment a
nextSeg =
Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
forall a.
Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
waitingAdvanceStream Bool
isReader (NextSegment a -> IORef (Maybe (NextSegment a))
forall a. NextSegment a -> IORef (Maybe (NextSegment a))
getNextRef NextSegment a
nextSeg) SegSource a
segSource (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx)
IO (NextSegment a)
-> (NextSegment a -> IO (NextSegment a)) -> IO (NextSegment a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> NextSegment a -> IO (NextSegment a)
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
NextSegment a
nextSeg <- Bool -> IO (NextSegment a) -> IO (NextSegment a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
relIx Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) (IO (NextSegment a) -> IO (NextSegment a))
-> IO (NextSegment a) -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$
if Int
segsAway Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return (NextSegment a -> IO (NextSegment a))
-> NextSegment a -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
str0
else Int -> NextSegment a -> IO (NextSegment a)
forall t. (Eq t, Num t) => t -> NextSegment a -> IO (NextSegment a)
go Int
segsAway (NextSegment a -> IO (NextSegment a))
-> NextSegment a -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
str0
let updateStreamHeadIfNecessary :: IO ()
updateStreamHeadIfNecessary =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segsAway Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let !offsetN :: Int
offsetN =
Int
offset0 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
logBounds)
IORef (StreamHead a) -> StreamHead a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (StreamHead a)
streamHead (StreamHead a -> IO ()) -> StreamHead a -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
offsetN (Stream a -> StreamHead a) -> Stream a -> StreamHead a
forall a b. (a -> b) -> a -> b
$ NextSegment a -> Stream a
forall a. NextSegment a -> Stream a
getStr NextSegment a
nextSeg
(Int, NextSegment a, IO ()) -> IO (Int, NextSegment a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
segIx, NextSegment a
nextSeg, IO ()
updateStreamHeadIfNecessary)
waitingAdvanceStream :: Bool -> IORef (Maybe (NextSegment a)) -> SegSource a
-> Int -> IO (NextSegment a)
waitingAdvanceStream :: Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
waitingAdvanceStream Bool
isReader IORef (Maybe (NextSegment a))
nextSegRef SegSource a
segSource = Int -> IO (NextSegment a)
forall t. (Ord t, Num t) => t -> IO (NextSegment a)
go where
cas :: Ticket (Maybe (NextSegment a))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
cas Ticket (Maybe (NextSegment a))
tk = IORef (Maybe (NextSegment a))
-> Ticket (Maybe (NextSegment a))
-> Maybe (NextSegment a)
-> IO (Bool, Ticket (Maybe (NextSegment a)))
forall a. IORef a -> Ticket a -> a -> IO (Bool, Ticket a)
casIORef IORef (Maybe (NextSegment a))
nextSegRef Ticket (Maybe (NextSegment a))
tk (Maybe (NextSegment a)
-> IO (Bool, Ticket (Maybe (NextSegment a))))
-> (NextSegment a -> Maybe (NextSegment a))
-> NextSegment a
-> IO (Bool, Ticket (Maybe (NextSegment a)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NextSegment a -> Maybe (NextSegment a)
forall a. a -> Maybe a
Just
peekInstalled :: (a, Ticket (Maybe a)) -> a
peekInstalled (a
_, Ticket (Maybe a)
nextSegTk) =
a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> a
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Impossible! This should only have been a Just NextBy* segment") (Maybe a -> a) -> Maybe a -> a
forall a b. (a -> b) -> a -> b
$
Ticket (Maybe a) -> Maybe a
forall a. Ticket a -> a
peekTicket Ticket (Maybe a)
nextSegTk
readerUnblockAndReturn :: NextSegment a -> IO (NextSegment a)
readerUnblockAndReturn NextSegment a
nextSeg = Bool -> IO (NextSegment a) -> IO (NextSegment a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
isReader (IO (NextSegment a) -> IO (NextSegment a))
-> IO (NextSegment a) -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ case NextSegment a
nextSeg of
NextByWriter Stream a
strAlreadyInstalled WriterCheckpoint
checkpt -> do
WriterCheckpoint -> IO ()
unblockWriters WriterCheckpoint
checkpt
let nextSeg' :: NextSegment a
nextSeg' = Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
strAlreadyInstalled
IORef (Maybe (NextSegment a)) -> Maybe (NextSegment a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (NextSegment a))
nextSegRef (Maybe (NextSegment a) -> IO ()) -> Maybe (NextSegment a) -> IO ()
forall a b. (a -> b) -> a -> b
$ NextSegment a -> Maybe (NextSegment a)
forall a. a -> Maybe a
Just NextSegment a
nextSeg'
NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextSeg'
NextSegment a
nextByReader -> NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextByReader
go :: t -> IO (NextSegment a)
go t
wait = Bool -> IO (NextSegment a) -> IO (NextSegment a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
>= t
0) (IO (NextSegment a) -> IO (NextSegment a))
-> IO (NextSegment a) -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ do
Ticket (Maybe (NextSegment a))
tk <- IORef (Maybe (NextSegment a))
-> IO (Ticket (Maybe (NextSegment a)))
forall a. IORef a -> IO (Ticket a)
readForCAS IORef (Maybe (NextSegment a))
nextSegRef
case Ticket (Maybe (NextSegment a)) -> Maybe (NextSegment a)
forall a. Ticket a -> a
peekTicket Ticket (Maybe (NextSegment a))
tk of
Maybe (NextSegment a)
Nothing
| t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
> t
0 -> t -> IO (NextSegment a)
go (t
wait t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
| Bool
otherwise -> do
Stream a
potentialStrNext <- StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
forall a.
StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
Stream (StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a)
-> SegSource a -> IO (IORef (Maybe (NextSegment a)) -> Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SegSource a
segSource IO (IORef (Maybe (NextSegment a)) -> Stream a)
-> IO (IORef (Maybe (NextSegment a))) -> IO (Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (NextSegment a) -> IO (IORef (Maybe (NextSegment a)))
forall a. a -> IO (IORef a)
newIORef Maybe (NextSegment a)
forall a. Maybe a
Nothing
if Bool
isReader
then do
(Bool, Ticket (Maybe (NextSegment a)))
installed <- Ticket (Maybe (NextSegment a))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
cas Ticket (Maybe (NextSegment a))
tk (NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a))))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
forall a b. (a -> b) -> a -> b
$ Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
potentialStrNext
NextSegment a -> IO (NextSegment a)
readerUnblockAndReturn (NextSegment a -> IO (NextSegment a))
-> NextSegment a -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ (Bool, Ticket (Maybe (NextSegment a))) -> NextSegment a
forall a a. (a, Ticket (Maybe a)) -> a
peekInstalled (Bool, Ticket (Maybe (NextSegment a)))
installed
else do
WriterCheckpoint
potentialCheckpt <- MVar () -> WriterCheckpoint
WriterCheckpoint (MVar () -> WriterCheckpoint)
-> IO (MVar ()) -> IO WriterCheckpoint
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
(Bool, Ticket (Maybe (NextSegment a))) -> NextSegment a
forall a a. (a, Ticket (Maybe a)) -> a
peekInstalled ((Bool, Ticket (Maybe (NextSegment a))) -> NextSegment a)
-> IO (Bool, Ticket (Maybe (NextSegment a))) -> IO (NextSegment a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ticket (Maybe (NextSegment a))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
cas Ticket (Maybe (NextSegment a))
tk (NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a))))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
forall a b. (a -> b) -> a -> b
$
Stream a -> WriterCheckpoint -> NextSegment a
forall a. Stream a -> WriterCheckpoint -> NextSegment a
NextByWriter Stream a
potentialStrNext WriterCheckpoint
potentialCheckpt)
Just NextSegment a
nextSeg
| Bool
isReader -> NextSegment a -> IO (NextSegment a)
readerUnblockAndReturn NextSegment a
nextSeg
| Bool
otherwise -> NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextSeg
type SegSource a = IO (StreamSegment a)
newSegmentSource :: Int -> IO (SegSource a)
newSegmentSource :: Int -> IO (SegSource a)
newSegmentSource Int
size = do
MutableArray RealWorld (Cell a)
arr <- Cell a -> IO (Cell a)
forall a. a -> IO a
evaluate Cell a
forall a. Cell a
Empty IO (Cell a) -> (Cell a -> SegSource a) -> SegSource a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Cell a -> IO (MutableArray (PrimState IO) (Cell a))
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (MutableArray (PrimState m) a)
P.newArray Int
size
SegSource a -> IO (SegSource a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MutableArray (PrimState IO) (Cell a)
-> Int -> Int -> IO (MutableArray (PrimState IO) (Cell a))
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a
-> Int -> Int -> m (MutableArray (PrimState m) a)
P.cloneMutableArray MutableArray RealWorld (Cell a)
MutableArray (PrimState IO) (Cell a)
arr Int
0 Int
size)
newtype WriterCheckpoint = WriterCheckpoint (MVar ())
unblockWriters :: WriterCheckpoint -> IO ()
unblockWriters :: WriterCheckpoint -> IO ()
unblockWriters (WriterCheckpoint MVar ()
v) =
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
v ()
writerCheckin :: WriterCheckpoint -> IO ()
writerCheckin :: WriterCheckpoint -> IO ()
writerCheckin (WriterCheckpoint MVar ()
v) = do
#if __GLASGOW_HASKELL__ < 708
takeMVar v >>= void . tryPutMVar v
#else
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
v
#endif
tryWriterCheckin :: WriterCheckpoint -> IO Bool
tryWriterCheckin :: WriterCheckpoint -> IO Bool
tryWriterCheckin (WriterCheckpoint MVar ()
v) =
#ifdef TRYREADMVAR
Maybe () -> Bool
forall a. Maybe a -> Bool
isJust (Maybe () -> Bool) -> IO (Maybe ()) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar ()
v
#else
tryTakeMVar v >>= maybe (return False) ((True <$) . tryPutMVar v)
#endif