{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE Trustworthy #-}
module BroadcastChan.Internal where
import Control.Concurrent.MVar
import Control.Exception (mask_)
import Control.Monad ((>=>))
import Control.Monad.IO.Unlift (MonadIO(..))
import System.IO.Unsafe (unsafeInterleaveIO)
data Direction = In
| Out
type In = 'In
type Out = 'Out
newtype BroadcastChan (dir :: Direction) a = BChan (MVar (Stream a))
deriving (BroadcastChan dir a -> BroadcastChan dir a -> Bool
(BroadcastChan dir a -> BroadcastChan dir a -> Bool)
-> (BroadcastChan dir a -> BroadcastChan dir a -> Bool)
-> Eq (BroadcastChan dir a)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
/= :: BroadcastChan dir a -> BroadcastChan dir a -> Bool
$c/= :: forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
== :: BroadcastChan dir a -> BroadcastChan dir a -> Bool
$c== :: forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
Eq)
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a {-# UNPACK #-} !(Stream a) | Closed
newBroadcastChan :: MonadIO m => m (BroadcastChan In a)
newBroadcastChan :: m (BroadcastChan In a)
newBroadcastChan = IO (BroadcastChan In a) -> m (BroadcastChan In a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BroadcastChan In a) -> m (BroadcastChan In a))
-> IO (BroadcastChan In a) -> m (BroadcastChan In a)
forall a b. (a -> b) -> a -> b
$ do
MVar (ChItem a)
hole <- IO (MVar (ChItem a))
forall a. IO (MVar a)
newEmptyMVar
MVar (MVar (ChItem a))
writeVar <- MVar (ChItem a) -> IO (MVar (MVar (ChItem a)))
forall a. a -> IO (MVar a)
newMVar MVar (ChItem a)
hole
BroadcastChan In a -> IO (BroadcastChan In a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (MVar (ChItem a)) -> BroadcastChan In a
forall (dir :: Direction) a. MVar (Stream a) -> BroadcastChan dir a
BChan MVar (MVar (ChItem a))
writeVar)
closeBChan :: MonadIO m => BroadcastChan In a -> m Bool
closeBChan :: BroadcastChan In a -> m Bool
closeBChan (BChan MVar (Stream a)
writeVar) = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> (IO Bool -> IO Bool) -> IO Bool -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Bool -> IO Bool
forall a. IO a -> IO a
mask_ (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
Stream a -> ChItem a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar Stream a
old_hole ChItem a
forall a. ChItem a
Closed IO Bool -> IO () -> IO Bool
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
old_hole
isClosedBChan :: MonadIO m => BroadcastChan dir a -> m Bool
isClosedBChan :: BroadcastChan dir a -> m Bool
isClosedBChan (BChan MVar (Stream a)
mvar) = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
mvar
Maybe (ChItem a)
val <- Stream a -> IO (Maybe (ChItem a))
forall a. MVar a -> IO (Maybe a)
tryReadMVar Stream a
old_hole
case Maybe (ChItem a)
val of
Just ChItem a
Closed -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Maybe (ChItem a)
_ -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
writeBChan :: MonadIO m => BroadcastChan In a -> a -> m Bool
writeBChan :: BroadcastChan In a -> a -> m Bool
writeBChan (BChan MVar (Stream a)
writeVar) a
val = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
Stream a
new_hole <- IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar
IO Bool -> IO Bool
forall a. IO a -> IO a
mask_ (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
Bool
empty <- Stream a -> ChItem a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar Stream a
old_hole (a -> Stream a -> ChItem a
forall a. a -> Stream a -> ChItem a
ChItem a
val Stream a
new_hole)
if Bool
empty
then MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
new_hole
else MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
old_hole
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
empty
{-# INLINE writeBChan #-}
readBChan :: MonadIO m => BroadcastChan Out a -> m (Maybe a)
readBChan :: BroadcastChan Out a -> m (Maybe a)
readBChan (BChan MVar (Stream a)
readVar) = IO (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> m (Maybe a)) -> IO (Maybe a) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
MVar (Stream a)
-> (Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVarMasked MVar (Stream a)
readVar ((Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a))
-> (Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \Stream a
read_end -> do
ChItem a
result <- Stream a -> IO (ChItem a)
forall a. MVar a -> IO a
readMVar Stream a
read_end
case ChItem a
result of
ChItem a
val Stream a
new_read_end -> (Stream a, Maybe a) -> IO (Stream a, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
new_read_end, a -> Maybe a
forall a. a -> Maybe a
Just a
val)
ChItem a
Closed -> (Stream a, Maybe a) -> IO (Stream a, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
read_end, Maybe a
forall a. Maybe a
Nothing)
{-# INLINE readBChan #-}
newBChanListener :: MonadIO m => BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener :: BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener (BChan MVar (Stream a)
mvar) = IO (BroadcastChan Out a) -> m (BroadcastChan Out a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BroadcastChan Out a) -> m (BroadcastChan Out a))
-> IO (BroadcastChan Out a) -> m (BroadcastChan Out a)
forall a b. (a -> b) -> a -> b
$ do
Stream a
hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
mvar
MVar (Stream a)
newReadVar <- Stream a -> IO (MVar (Stream a))
forall a. a -> IO (MVar a)
newMVar Stream a
hole
BroadcastChan Out a -> IO (BroadcastChan Out a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Stream a) -> BroadcastChan Out a
forall (dir :: Direction) a. MVar (Stream a) -> BroadcastChan dir a
BChan MVar (Stream a)
newReadVar)
getBChanContents :: BroadcastChan dir a -> IO [a]
getBChanContents :: BroadcastChan dir a -> IO [a]
getBChanContents = BroadcastChan dir a -> IO (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener (BroadcastChan dir a -> IO (BroadcastChan Out a))
-> (BroadcastChan Out a -> IO [a]) -> BroadcastChan dir a -> IO [a]
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> BroadcastChan Out a -> IO [a]
forall a. BroadcastChan Out a -> IO [a]
go
where
go :: BroadcastChan Out a -> IO [a]
go BroadcastChan Out a
ch = IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (IO [a] -> IO [a]) -> IO [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
Maybe a
result <- BroadcastChan Out a -> IO (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
ch
case Maybe a
result of
Maybe a
Nothing -> [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
Just a
x -> do
[a]
xs <- BroadcastChan Out a -> IO [a]
go BroadcastChan Out a
ch
[a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
xs)
foldBChan
:: (MonadIO m, MonadIO n)
=> (x -> a -> x)
-> x
-> (x -> b)
-> BroadcastChan d a
-> n (m b)
foldBChan :: (x -> a -> x) -> x -> (x -> b) -> BroadcastChan d a -> n (m b)
foldBChan x -> a -> x
step x
begin x -> b
done BroadcastChan d a
chan = do
BroadcastChan Out a
listen <- BroadcastChan d a -> n (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan d a
chan
m b -> n (m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (m b -> n (m b)) -> m b -> n (m b)
forall a b. (a -> b) -> a -> b
$ BroadcastChan Out a -> x -> m b
forall (m :: * -> *). MonadIO m => BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
begin
where
go :: BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x = do
Maybe a
x' <- BroadcastChan Out a -> m (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
listen
case Maybe a
x' of
Just a
x'' -> BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen (x -> m b) -> x -> m b
forall a b. (a -> b) -> a -> b
$! x -> a -> x
step x
x a
x''
Maybe a
Nothing -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> b -> m b
forall a b. (a -> b) -> a -> b
$! x -> b
done x
x
{-# INLINABLE foldBChan #-}
foldBChanM
:: (MonadIO m, MonadIO n)
=> (x -> a -> m x)
-> m x
-> (x -> m b)
-> BroadcastChan d a
-> n (m b)
foldBChanM :: (x -> a -> m x)
-> m x -> (x -> m b) -> BroadcastChan d a -> n (m b)
foldBChanM x -> a -> m x
step m x
begin x -> m b
done BroadcastChan d a
chan = do
BroadcastChan Out a
listen <- BroadcastChan d a -> n (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan d a
chan
m b -> n (m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (m b -> n (m b)) -> m b -> n (m b)
forall a b. (a -> b) -> a -> b
$ do
x
x0 <- m x
begin
BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x0
where
go :: BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x = do
Maybe a
x' <- BroadcastChan Out a -> m (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
listen
case Maybe a
x' of
Just a
x'' -> x -> a -> m x
step x
x a
x'' m x -> (x -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen
Maybe a
Nothing -> x -> m b
done x
x
{-# INLINABLE foldBChanM #-}