module Pipes.Concurrent.PQueue
( spawn
, withSpawn
, Input (..)
, Output (..)
, fromInput
, toOutput
) where
import Control.Applicative
import Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TPQueue as TPQueue
import Control.Exception (bracket)
import Control.Monad
import Pipes.Concurrent
( Input (..)
, Output (..)
, fromInput
, toOutput
)
spawn :: Ord p => IO (Output (p, a), Input a, STM ())
spawn :: IO (Output (p, a), Input a, STM ())
spawn = do
TPQueue p a
q <- IO (TPQueue p a)
forall k v. Ord k => IO (TPQueue k v)
TPQueue.newTPQueueIO
TVar Bool
sealed <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
STM.newTVarIO Bool
False
let seal :: STM ()
seal = TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar Bool
sealed Bool
True
TVar ()
rSend <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
STM.newTVarIO ()
IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
STM.mkWeakTVar TVar ()
rSend (STM () -> IO ()
forall a. STM a -> IO a
STM.atomically STM ()
seal))
TVar ()
rRecv <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
STM.newTVarIO ()
IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
STM.mkWeakTVar TVar ()
rRecv (STM () -> IO ()
forall a. STM a -> IO a
STM.atomically STM ()
seal))
let sendOrEnd :: p -> a -> STM Bool
sendOrEnd p
p a
a = do
Bool
isSealed <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
sealed
if Bool
isSealed
then Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
else TPQueue p a -> p -> a -> STM ()
forall k v. Ord k => TPQueue k v -> k -> v -> STM ()
TPQueue.writeTPQueue TPQueue p a
q p
p a
a STM () -> STM Bool -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
readOrEnd :: STM (Maybe a)
readOrEnd = (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just (TPQueue p a -> STM a
forall k v. Ord k => TPQueue k v -> STM v
TPQueue.readTPQueue TPQueue p a
q)
STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
sealed STM Bool -> (Bool -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check STM () -> STM (Maybe a) -> STM (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing)
_send :: (p, a) -> STM Bool
_send (p
p, a
a) = p -> a -> STM Bool
sendOrEnd p
p a
a STM Bool -> STM () -> STM Bool
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rSend
_recv :: STM (Maybe a)
_recv = STM (Maybe a)
readOrEnd STM (Maybe a) -> STM () -> STM (Maybe a)
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rRecv
(Output (p, a), Input a, STM ())
-> IO (Output (p, a), Input a, STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return (((p, a) -> STM Bool) -> Output (p, a)
forall a. (a -> STM Bool) -> Output a
Output (p, a) -> STM Bool
_send, STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input STM (Maybe a)
_recv, STM ()
seal)
{-# INLINABLE spawn #-}
withSpawn :: Ord p => ((Output (p, a), Input a) -> IO r) -> IO r
withSpawn :: ((Output (p, a), Input a) -> IO r) -> IO r
withSpawn (Output (p, a), Input a) -> IO r
action = IO (Output (p, a), Input a, STM ())
-> ((Output (p, a), Input a, STM ()) -> IO ())
-> ((Output (p, a), Input a, STM ()) -> IO r)
-> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Output (p, a), Input a, STM ())
forall p a. Ord p => IO (Output (p, a), Input a, STM ())
spawn
(\(Output (p, a)
_, Input a
_, STM ()
seal) -> STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
(\(Output (p, a)
output, Input a
input, STM ()
_) -> (Output (p, a), Input a) -> IO r
action (Output (p, a)
output, Input a
input))