{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.NoBlocking.Internal
#ifdef NOT_optimised
{-# WARNING "This library is unlikely to perform well on architectures other than i386/x64/aarch64" #-}
#endif
(sEGMENT_LENGTH
, InChan(..), OutChan(..), ChanEnd(..), StreamSegment, Cell, Stream(..)
, NextSegment(..), StreamHead(..)
, newChanStarting, writeChan, tryReadChan, readChan, UT.Element(..)
, dupChan
, streamChan
, isActive
)
where
import Data.IORef
import Control.Exception
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Typeable(Typeable)
import Control.Concurrent.Chan.Unagi.Internal(
newSegmentSource, moveToNextCell, waitingAdvanceStream,
ChanEnd(..), StreamHead(..), StreamSegment, Stream(..), NextSegment(..))
import Control.Concurrent.Chan.Unagi.Constants
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT
import Prelude
data InChan a = InChan !(IORef Bool)
!(ChanEnd (Cell a))
deriving (Typeable,InChan a -> InChan a -> Bool
(InChan a -> InChan a -> Bool)
-> (InChan a -> InChan a -> Bool) -> Eq (InChan a)
forall a. InChan a -> InChan a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: InChan a -> InChan a -> Bool
$c/= :: forall a. InChan a -> InChan a -> Bool
== :: InChan a -> InChan a -> Bool
$c== :: forall a. InChan a -> InChan a -> Bool
Eq)
data OutChan a = OutChan !(IORef Bool)
!(ChanEnd (Cell a))
deriving (Typeable,OutChan a -> OutChan a -> Bool
(OutChan a -> OutChan a -> Bool)
-> (OutChan a -> OutChan a -> Bool) -> Eq (OutChan a)
forall a. OutChan a -> OutChan a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OutChan a -> OutChan a -> Bool
$c/= :: forall a. OutChan a -> OutChan a -> Bool
== :: OutChan a -> OutChan a -> Bool
$c== :: forall a. OutChan a -> OutChan a -> Bool
Eq)
type Cell a = Maybe a
newChanStarting :: Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset = do
SegSource (Maybe a)
segSource <- Maybe a -> IO (SegSource (Maybe a))
forall cell_a. cell_a -> IO (SegSource cell_a)
newSegmentSource Maybe a
forall a. Maybe a
Nothing
Stream (Maybe a)
stream <- StreamSegment (Maybe a)
-> IORef (NextSegment (Maybe a)) -> Stream (Maybe a)
forall cell_a.
StreamSegment cell_a -> IORef (NextSegment cell_a) -> Stream cell_a
Stream (StreamSegment (Maybe a)
-> IORef (NextSegment (Maybe a)) -> Stream (Maybe a))
-> SegSource (Maybe a)
-> IO (IORef (NextSegment (Maybe a)) -> Stream (Maybe a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SegSource (Maybe a)
segSource
IO (IORef (NextSegment (Maybe a)) -> Stream (Maybe a))
-> IO (IORef (NextSegment (Maybe a))) -> IO (Stream (Maybe a))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment (Maybe a) -> IO (IORef (NextSegment (Maybe a)))
forall a. a -> IO (IORef a)
newIORef NextSegment (Maybe a)
forall cell_a. NextSegment cell_a
NoSegment
let end :: IO (ChanEnd (Maybe a))
end = SegSource (Maybe a)
-> AtomicCounter
-> IORef (StreamHead (Maybe a))
-> ChanEnd (Maybe a)
forall cell_a.
SegSource cell_a
-> AtomicCounter -> IORef (StreamHead cell_a) -> ChanEnd cell_a
ChanEnd SegSource (Maybe a)
segSource
(AtomicCounter
-> IORef (StreamHead (Maybe a)) -> ChanEnd (Maybe a))
-> IO AtomicCounter
-> IO (IORef (StreamHead (Maybe a)) -> ChanEnd (Maybe a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
IO (IORef (StreamHead (Maybe a)) -> ChanEnd (Maybe a))
-> IO (IORef (StreamHead (Maybe a))) -> IO (ChanEnd (Maybe a))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead (Maybe a) -> IO (IORef (StreamHead (Maybe a)))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream (Maybe a) -> StreamHead (Maybe a)
forall cell_a. Int -> Stream cell_a -> StreamHead cell_a
StreamHead Int
startingCellOffset Stream (Maybe a)
stream)
inEnd :: ChanEnd (Maybe a)
inEnd@(ChanEnd SegSource (Maybe a)
_ AtomicCounter
_ IORef (StreamHead (Maybe a))
inHeadRef) <- IO (ChanEnd (Maybe a))
end
IORef Bool
finalizee <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
IO (Weak (IORef (StreamHead (Maybe a)))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef (StreamHead (Maybe a)))) -> IO ())
-> IO (Weak (IORef (StreamHead (Maybe a)))) -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (StreamHead (Maybe a))
-> IO () -> IO (Weak (IORef (StreamHead (Maybe a))))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef (StreamHead (Maybe a))
inHeadRef (IO () -> IO (Weak (IORef (StreamHead (Maybe a)))))
-> IO () -> IO (Weak (IORef (StreamHead (Maybe a))))
forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
finalizee Bool
False
(,) (IORef Bool -> ChanEnd (Maybe a) -> InChan a
forall a. IORef Bool -> ChanEnd (Cell a) -> InChan a
InChan IORef Bool
finalizee ChanEnd (Maybe a)
inEnd) (OutChan a -> (InChan a, OutChan a))
-> IO (OutChan a) -> IO (InChan a, OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IORef Bool -> ChanEnd (Maybe a) -> OutChan a
forall a. IORef Bool -> ChanEnd (Cell a) -> OutChan a
OutChan IORef Bool
finalizee (ChanEnd (Maybe a) -> OutChan a)
-> IO (ChanEnd (Maybe a)) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd (Maybe a))
end)
isActive :: OutChan a -> IO Bool
isActive :: OutChan a -> IO Bool
isActive (OutChan IORef Bool
finalizee ChanEnd (Cell a)
_) = do
Bool
b <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
finalizee
IO ()
loadLoadBarrier
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
b
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan IORef Bool
finalizee (ChanEnd SegSource (Cell a)
segSource AtomicCounter
counter IORef (StreamHead (Cell a))
streamHead)) = do
StreamHead (Cell a)
hLoc <- IORef (StreamHead (Cell a)) -> IO (StreamHead (Cell a))
forall a. IORef a -> IO a
readIORef IORef (StreamHead (Cell a))
streamHead
IO ()
loadLoadBarrier
Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
AtomicCounter
counter' <- Int -> IO AtomicCounter
newCounter Int
wCount
IORef (StreamHead (Cell a))
streamHead' <- StreamHead (Cell a) -> IO (IORef (StreamHead (Cell a)))
forall a. a -> IO (IORef a)
newIORef StreamHead (Cell a)
hLoc
OutChan a -> IO (OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutChan a -> IO (OutChan a)) -> OutChan a -> IO (OutChan a)
forall a b. (a -> b) -> a -> b
$ IORef Bool -> ChanEnd (Cell a) -> OutChan a
forall a. IORef Bool -> ChanEnd (Cell a) -> OutChan a
OutChan IORef Bool
finalizee (ChanEnd (Cell a) -> OutChan a) -> ChanEnd (Cell a) -> OutChan a
forall a b. (a -> b) -> a -> b
$ SegSource (Cell a)
-> AtomicCounter -> IORef (StreamHead (Cell a)) -> ChanEnd (Cell a)
forall cell_a.
SegSource cell_a
-> AtomicCounter -> IORef (StreamHead cell_a) -> ChanEnd cell_a
ChanEnd SegSource (Cell a)
segSource AtomicCounter
counter' IORef (StreamHead (Cell a))
streamHead'
writeChan :: InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan (InChan IORef Bool
_ ce :: ChanEnd (Cell a)
ce@(ChanEnd SegSource (Cell a)
segSource AtomicCounter
_ IORef (StreamHead (Cell a))
_)) = \a
a-> IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
next), IO ()
maybeUpdateStreamHead) <- ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce
MutableArray (PrimState IO) (Cell a) -> Int -> Cell a -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a -> Int -> a -> m ()
P.writeArray StreamSegment (Cell a)
MutableArray (PrimState IO) (Cell a)
seg Int
segIx (a -> Cell a
forall a. a -> Maybe a
Just a
a)
IO ()
maybeUpdateStreamHead
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Stream (Cell a)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Stream (Cell a)) -> IO ()) -> IO (Stream (Cell a)) -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef (NextSegment (Cell a))
-> SegSource (Cell a) -> Int -> IO (Stream (Cell a))
forall cell_a.
IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment (Cell a))
next SegSource (Cell a)
segSource Int
0
tryReadChan :: OutChan a -> IO (UT.Element a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a)
tryReadChan (OutChan IORef Bool
_ ChanEnd (Cell a)
ce) = do
(Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
_), IO ()
maybeUpdateStreamHead) <- ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce
IO ()
maybeUpdateStreamHead
Element a -> IO (Element a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Element a -> IO (Element a)) -> Element a -> IO (Element a)
forall a b. (a -> b) -> a -> b
$ IO (Cell a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Cell a) -> Element a) -> IO (Cell a) -> Element a
forall a b. (a -> b) -> a -> b
$ MutableArray (PrimState IO) (Cell a) -> Int -> IO (Cell a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a -> Int -> m a
P.readArray StreamSegment (Cell a)
MutableArray (PrimState IO) (Cell a)
seg Int
segIx
readChan :: IO () -> OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: IO () -> OutChan a -> IO a
readChan IO ()
io OutChan a
oc = OutChan a -> IO (Element a)
forall a. OutChan a -> IO (Element a)
tryReadChan OutChan a
oc IO (Element a) -> (Element a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Element a
el->
let peekMaybe :: IO a -> IO a
peekMaybe IO a
f = Element a -> IO (Maybe a)
forall a. Element a -> IO (Maybe a)
UT.tryRead Element a
el IO (Maybe a) -> (Maybe a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO a -> (a -> IO a) -> Maybe a -> IO a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO a
f a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
go :: IO a
go = IO a -> IO a
peekMaybe IO a
checkAndGo
checkAndGo :: IO a
checkAndGo = do
Bool
b <- OutChan a -> IO Bool
forall a. OutChan a -> IO Bool
isActive OutChan a
oc
if Bool
b then IO ()
io IO () -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
go
else IO a -> IO a
peekMaybe (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ BlockedIndefinitelyOnMVar -> IO a
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar
in IO a
go
streamChan :: Int -> OutChan a -> IO [UT.Stream a]
{-# INLINE streamChan #-}
streamChan :: Int -> OutChan a -> IO [Stream a]
streamChan Int
period (OutChan IORef Bool
_ (ChanEnd SegSource (Cell a)
segSource AtomicCounter
counter IORef (StreamHead (Cell a))
streamHead)) = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
period Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Argument to streamChan must be > 0"
(StreamHead Int
offsetInitial Stream (Cell a)
strInitial) <- IORef (StreamHead (Cell a)) -> IO (StreamHead (Cell a))
forall a. IORef a -> IO a
readIORef IORef (StreamHead (Cell a))
streamHead
IO ()
loadLoadBarrier
!Int
ix0 <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
let stream :: Int -> Stream (Cell a) -> Int -> Stream a
stream !Int
offset0 Stream (Cell a)
str0 !Int
ix = IO (Next a) -> Stream a
forall a. IO (Next a) -> Stream a
UT.Stream (IO (Next a) -> Stream a) -> IO (Next a) -> Stream a
forall a b. (a -> b) -> a -> b
$ do
let (Int
segsAway, Int
segIx) = Bool -> (Int, Int) -> (Int, Int)
forall a. HasCallStack => Bool -> a -> a
assert ((Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) ((Int, Int) -> (Int, Int)) -> (Int, Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$
Int -> (Int, Int)
divMod_sEGMENT_LENGTH (Int -> (Int, Int)) -> Int -> (Int, Int)
forall a b. (a -> b) -> a -> b
$! (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0)
{-# INLINE go #-}
go :: t -> Stream (Cell a) -> IO (Stream (Cell a))
go t
0 Stream (Cell a)
str = Stream (Cell a) -> IO (Stream (Cell a))
forall (m :: * -> *) a. Monad m => a -> m a
return Stream (Cell a)
str
go !t
n (Stream StreamSegment (Cell a)
_ IORef (NextSegment (Cell a))
next) =
IORef (NextSegment (Cell a))
-> SegSource (Cell a) -> Int -> IO (Stream (Cell a))
forall cell_a.
IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment (Cell a))
next SegSource (Cell a)
segSource (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx)
IO (Stream (Cell a))
-> (Stream (Cell a) -> IO (Stream (Cell a)))
-> IO (Stream (Cell a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> Stream (Cell a) -> IO (Stream (Cell a))
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
str :: Stream (Cell a)
str@(Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
_) <- Int -> Stream (Cell a) -> IO (Stream (Cell a))
forall t.
(Eq t, Num t) =>
t -> Stream (Cell a) -> IO (Stream (Cell a))
go Int
segsAway Stream (Cell a)
str0
let !strOffset :: Int
strOffset = Int
offset0Int -> Int -> Int
forall a. Num a => a -> a -> a
+(Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH)
Cell a
mbA <- MutableArray (PrimState IO) (Cell a) -> Int -> IO (Cell a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a -> Int -> m a
P.readArray StreamSegment (Cell a)
MutableArray (PrimState IO) (Cell a)
seg Int
segIx
case Cell a
mbA of
Cell a
Nothing -> Next a -> IO (Next a)
forall (m :: * -> *) a. Monad m => a -> m a
return Next a
forall a. Next a
UT.Pending
Just a
a -> Next a -> IO (Next a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Next a -> IO (Next a)) -> Next a -> IO (Next a)
forall a b. (a -> b) -> a -> b
$ a -> Stream a -> Next a
forall a. a -> Stream a -> Next a
UT.Next a
a (Stream a -> Next a) -> Stream a -> Next a
forall a b. (a -> b) -> a -> b
$ Int -> Stream (Cell a) -> Int -> Stream a
stream Int
strOffset Stream (Cell a)
str (Int
ixInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
period)
[Stream a] -> IO [Stream a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream a] -> IO [Stream a]) -> [Stream a] -> IO [Stream a]
forall a b. (a -> b) -> a -> b
$ (Int -> Stream a) -> [Int] -> [Stream a]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Stream (Cell a) -> Int -> Stream a
stream Int
offsetInitial Stream (Cell a)
strInitial) ([Int] -> [Stream a]) -> [Int] -> [Stream a]
forall a b. (a -> b) -> a -> b
$
Int -> [Int] -> [Int]
forall a. Int -> [a] -> [a]
take Int
period ([Int] -> [Int]) -> [Int] -> [Int]
forall a b. (a -> b) -> a -> b
$ (Int -> Int) -> Int -> [Int]
forall a. (a -> a) -> a -> [a]
iterate (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Int
ix0