{-# OPTIONS_HADDOCK not-home #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Safe #-}
module BroadcastChan.Extra
( Action(..)
, BracketOnError(..)
, Handler(..)
, ThreadBracket(..)
, mapHandler
, runParallel
, runParallelWith
, runParallel_
, runParallelWith_
) where
import Control.Concurrent (ThreadId, forkFinally, mkWeakThreadId, myThreadId)
import Control.Concurrent.MVar
import Control.Concurrent.QSem
import Control.Concurrent.QSemN
import Control.Exception (Exception(..), SomeException(..), bracketOnError)
import qualified Control.Exception as Exc
import Control.Monad ((>=>), join, replicateM, void)
import Control.Monad.Trans.Cont (ContT(..))
import Control.Monad.IO.Unlift (MonadIO(..))
import Data.Typeable (Typeable)
import System.Mem.Weak (Weak, deRefWeak)
import BroadcastChan.Internal
evalContT :: Monad m => ContT r m r -> m r
evalContT :: ContT r m r -> m r
evalContT ContT r m r
m = ContT r m r -> (r -> m r) -> m r
forall k (r :: k) (m :: k -> *) a. ContT r m a -> (a -> m r) -> m r
runContT ContT r m r
m r -> m r
forall (m :: * -> *) a. Monad m => a -> m a
return
unsafeWriteBChan :: MonadIO m => BroadcastChan In a -> a -> m ()
unsafeWriteBChan :: BroadcastChan In a -> a -> m ()
unsafeWriteBChan (BChan MVar (Stream a)
writeVar) a
val = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Stream a
new_hole <- IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar
IO () -> IO ()
forall a. IO a -> IO a
Exc.mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
Maybe (ChItem a)
item <- Stream a -> IO (Maybe (ChItem a))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar Stream a
old_hole
case Maybe (ChItem a)
item of
Maybe (ChItem a)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ChItem a
Closed -> Stream a -> ChItem a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar Stream a
new_hole ChItem a
forall a. ChItem a
Closed
Just ChItem a
_ -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"unsafeWriteBChan hit an impossible condition!"
Stream a -> ChItem a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar Stream a
old_hole (a -> Stream a -> ChItem a
forall a. a -> Stream a -> ChItem a
ChItem a
val Stream a
new_hole)
MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
new_hole
{-# INLINE unsafeWriteBChan #-}
data Shutdown = Shutdown deriving (Int -> Shutdown -> ShowS
[Shutdown] -> ShowS
Shutdown -> [Char]
(Int -> Shutdown -> ShowS)
-> (Shutdown -> [Char]) -> ([Shutdown] -> ShowS) -> Show Shutdown
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Shutdown] -> ShowS
$cshowList :: [Shutdown] -> ShowS
show :: Shutdown -> [Char]
$cshow :: Shutdown -> [Char]
showsPrec :: Int -> Shutdown -> ShowS
$cshowsPrec :: Int -> Shutdown -> ShowS
Show, Typeable)
instance Exception Shutdown
data Action
= Drop
| Retry
| Terminate
deriving (Action -> Action -> Bool
(Action -> Action -> Bool)
-> (Action -> Action -> Bool) -> Eq Action
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Action -> Action -> Bool
$c/= :: Action -> Action -> Bool
== :: Action -> Action -> Bool
$c== :: Action -> Action -> Bool
Eq, Int -> Action -> ShowS
[Action] -> ShowS
Action -> [Char]
(Int -> Action -> ShowS)
-> (Action -> [Char]) -> ([Action] -> ShowS) -> Show Action
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Action] -> ShowS
$cshowList :: [Action] -> ShowS
show :: Action -> [Char]
$cshow :: Action -> [Char]
showsPrec :: Int -> Action -> ShowS
$cshowsPrec :: Int -> Action -> ShowS
Show)
data Handler m a
= Simple Action
| Handle (a -> SomeException -> m Action)
data BracketOnError m r
= Bracket
{ BracketOnError m r -> IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
, BracketOnError m r -> [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
, BracketOnError m r -> m r
action :: m r
}
data ThreadBracket
= ThreadBracket
{ ThreadBracket -> IO ()
setupFork :: IO ()
, ThreadBracket -> IO ()
cleanupFork :: IO ()
, ThreadBracket -> IO ()
cleanupForkError :: IO ()
}
noopBracket :: ThreadBracket
noopBracket :: ThreadBracket
noopBracket = ThreadBracket :: IO () -> IO () -> IO () -> ThreadBracket
ThreadBracket
{ setupFork :: IO ()
setupFork = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
, cleanupFork :: IO ()
cleanupFork = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
, cleanupForkError :: IO ()
cleanupForkError = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
}
mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a
mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a
mapHandler m Action -> n Action
_ (Simple Action
act) = Action -> Handler n a
forall (m :: * -> *) a. Action -> Handler m a
Simple Action
act
mapHandler m Action -> n Action
mmorph (Handle a -> SomeException -> m Action
f) = (a -> SomeException -> n Action) -> Handler n a
forall (m :: * -> *) a.
(a -> SomeException -> m Action) -> Handler m a
Handle ((a -> SomeException -> n Action) -> Handler n a)
-> (a -> SomeException -> n Action) -> Handler n a
forall a b. (a -> b) -> a -> b
$ \a
a SomeException
exc -> m Action -> n Action
mmorph (a -> SomeException -> m Action
f a
a SomeException
exc)
parallelCore
:: forall a m
. MonadIO m
=> Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
parallelCore :: Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
m ())
parallelCore Handler IO a
hndl Int
threads IO ()
onDrop ThreadBracket
threadBracket a -> IO ()
f = IO (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
m ()))
-> IO
(IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
m ())
forall a b. (a -> b) -> a -> b
$ do
ThreadId
originTid <- IO ThreadId
myThreadId
BroadcastChan In a
inChanIn <- IO (BroadcastChan In a)
forall (m :: * -> *) a. MonadIO m => m (BroadcastChan In a)
newBroadcastChan
BroadcastChan Out a
inChanOut <- BroadcastChan In a -> IO (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan In a
inChanIn
QSemN
shutdownSem <- Int -> IO QSemN
newQSemN Int
0
QSemN
endSem <- Int -> IO QSemN
newQSemN Int
0
MVar (SomeException -> IO ())
excMVar <- (SomeException -> IO ()) -> IO (MVar (SomeException -> IO ()))
forall a. a -> IO (MVar a)
newMVar (ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Exc.throwTo ThreadId
originTid)
let bufferValue :: a -> IO ()
bufferValue :: a -> IO ()
bufferValue = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (a -> IO Bool) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BroadcastChan In a -> a -> IO Bool
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m Bool
writeBChan BroadcastChan In a
inChanIn
simpleHandler :: a -> SomeException -> Action -> IO ()
simpleHandler :: a -> SomeException -> Action -> IO ()
simpleHandler a
val SomeException
exc Action
act = case Action
act of
Action
Drop -> IO ()
onDrop
Action
Retry -> BroadcastChan In a -> a -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m ()
unsafeWriteBChan BroadcastChan In a
inChanIn a
val
Action
Terminate -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
Exc.throwIO SomeException
exc
handler :: a -> SomeException -> IO ()
handler :: a -> SomeException -> IO ()
handler a
_ SomeException
exc | Just Shutdown
Shutdown <- SomeException -> Maybe Shutdown
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc = SomeException -> IO ()
forall e a. Exception e => e -> IO a
Exc.throwIO SomeException
exc
handler a
val SomeException
exc = case Handler IO a
hndl of
Simple Action
a -> a -> SomeException -> Action -> IO ()
simpleHandler a
val SomeException
exc Action
a
Handle a -> SomeException -> IO Action
h -> a -> SomeException -> IO Action
h a
val SomeException
exc IO Action -> (Action -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> SomeException -> Action -> IO ()
simpleHandler a
val SomeException
exc
processInput :: IO ()
processInput :: IO ()
processInput = do
Maybe a
x <- BroadcastChan Out a -> IO (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
inChanOut
case Maybe a
x of
Maybe a
Nothing -> QSemN -> Int -> IO ()
signalQSemN QSemN
endSem Int
1
Just a
a -> do
a -> IO ()
f a
a IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exc.catch` a -> SomeException -> IO ()
handler a
a
IO ()
processInput
unsafeAllocThread :: IO (Weak ThreadId)
unsafeAllocThread :: IO (Weak ThreadId)
unsafeAllocThread = do
IO ()
setupFork
ThreadId
tid <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO ()
processInput ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
exit -> do
QSemN -> Int -> IO ()
signalQSemN QSemN
shutdownSem Int
1
case Either SomeException ()
exit of
Left SomeException
exc
| Just Shutdown
Shutdown <- SomeException -> Maybe Shutdown
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc -> IO ()
cleanupForkError
| Bool
otherwise -> do
IO ()
cleanupForkError
Maybe (SomeException -> IO ())
reportErr <- MVar (SomeException -> IO ())
-> IO (Maybe (SomeException -> IO ()))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar (SomeException -> IO ())
excMVar
case Maybe (SomeException -> IO ())
reportErr of
Maybe (SomeException -> IO ())
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just SomeException -> IO ()
throw -> SomeException -> IO ()
throw SomeException
exc IO () -> (Shutdown -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exc.catch` Shutdown -> IO ()
forall (m :: * -> *). Monad m => Shutdown -> m ()
shutdownHandler
Right () -> IO ()
cleanupFork
ThreadId -> IO (Weak ThreadId)
mkWeakThreadId ThreadId
tid
where
shutdownHandler :: Shutdown -> m ()
shutdownHandler Shutdown
Shutdown = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
allocThread :: ContT r IO (Weak ThreadId)
allocThread :: ContT r IO (Weak ThreadId)
allocThread = ((Weak ThreadId -> IO r) -> IO r) -> ContT r IO (Weak ThreadId)
forall k (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Weak ThreadId -> IO r) -> IO r) -> ContT r IO (Weak ThreadId))
-> ((Weak ThreadId -> IO r) -> IO r) -> ContT r IO (Weak ThreadId)
forall a b. (a -> b) -> a -> b
$ IO (Weak ThreadId)
-> (Weak ThreadId -> IO ()) -> (Weak ThreadId -> IO r) -> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError IO (Weak ThreadId)
unsafeAllocThread Weak ThreadId -> IO ()
killWeakThread
allocateThreads :: IO [Weak ThreadId]
allocateThreads :: IO [Weak ThreadId]
allocateThreads = ContT [Weak ThreadId] IO [Weak ThreadId] -> IO [Weak ThreadId]
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT [Weak ThreadId] IO [Weak ThreadId] -> IO [Weak ThreadId])
-> ContT [Weak ThreadId] IO [Weak ThreadId] -> IO [Weak ThreadId]
forall a b. (a -> b) -> a -> b
$ Int
-> ContT [Weak ThreadId] IO (Weak ThreadId)
-> ContT [Weak ThreadId] IO [Weak ThreadId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
threads ContT [Weak ThreadId] IO (Weak ThreadId)
forall r. ContT r IO (Weak ThreadId)
allocThread
cleanup :: [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup [Weak ThreadId]
threadIds = IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall a. IO a -> IO a
Exc.uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Weak ThreadId -> IO ()) -> [Weak ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Weak ThreadId -> IO ()
killWeakThread [Weak ThreadId]
threadIds
QSemN -> Int -> IO ()
waitQSemN QSemN
shutdownSem Int
threads
wait :: m ()
wait :: m ()
wait = do
BroadcastChan In a -> m Bool
forall (m :: * -> *) a. MonadIO m => BroadcastChan In a -> m Bool
closeBChan BroadcastChan In a
inChanIn
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ QSemN -> Int -> IO ()
waitQSemN QSemN
endSem Int
threads
(IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
-> IO
(IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO [Weak ThreadId]
allocateThreads, [Weak ThreadId] -> IO ()
cleanup, a -> IO ()
bufferValue, m ()
wait)
where
ThreadBracket{IO ()
setupFork :: IO ()
setupFork :: ThreadBracket -> IO ()
setupFork,IO ()
cleanupFork :: IO ()
cleanupFork :: ThreadBracket -> IO ()
cleanupFork,IO ()
cleanupForkError :: IO ()
cleanupForkError :: ThreadBracket -> IO ()
cleanupForkError} = ThreadBracket
threadBracket
killWeakThread :: Weak ThreadId -> IO ()
killWeakThread :: Weak ThreadId -> IO ()
killWeakThread Weak ThreadId
wTid = do
Maybe ThreadId
tid <- Weak ThreadId -> IO (Maybe ThreadId)
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak ThreadId
wTid
case Maybe ThreadId
tid of
Maybe ThreadId
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ThreadId
t -> ThreadId -> Shutdown -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Exc.throwTo ThreadId
t Shutdown
Shutdown
runParallel
:: forall a b m n r
. (MonadIO m, MonadIO n)
=> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallel :: Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallel = ThreadBracket
-> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
forall a b (m :: * -> *) (n :: * -> *) r.
(MonadIO m, MonadIO n) =>
ThreadBracket
-> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallelWith ThreadBracket
noopBracket
runParallelWith
:: forall a b m n r
. (MonadIO m, MonadIO n)
=> ThreadBracket
-> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallelWith :: ThreadBracket
-> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallelWith ThreadBracket
threadBracket Either (b -> n r) (r -> b -> n r)
yielder Handler IO a
hndl Int
threads a -> IO b
work (a -> m ()) -> (a -> m (Maybe b)) -> n r
pipe = do
BroadcastChan In (Maybe b)
outChanIn <- n (BroadcastChan In (Maybe b))
forall (m :: * -> *) a. MonadIO m => m (BroadcastChan In a)
newBroadcastChan
BroadcastChan Out (Maybe b)
outChanOut <- BroadcastChan In (Maybe b) -> n (BroadcastChan Out (Maybe b))
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan In (Maybe b)
outChanIn
let process :: MonadIO f => a -> f ()
process :: a -> f ()
process = IO () -> f ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> f ()) -> (a -> IO ()) -> a -> f ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> IO b
work (a -> IO b) -> (b -> IO ()) -> a -> IO ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (b -> IO Bool) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BroadcastChan In (Maybe b) -> Maybe b -> IO Bool
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m Bool
writeBChan BroadcastChan In (Maybe b)
outChanIn (Maybe b -> IO Bool) -> (b -> Maybe b) -> b -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Maybe b
forall a. a -> Maybe a
Just)
notifyDrop :: IO ()
notifyDrop :: IO ()
notifyDrop = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ BroadcastChan In (Maybe b) -> Maybe b -> IO Bool
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m Bool
writeBChan BroadcastChan In (Maybe b)
outChanIn Maybe b
forall a. Maybe a
Nothing
(IO [Weak ThreadId]
allocate, [Weak ThreadId] -> IO ()
cleanup, a -> IO ()
bufferValue, n ()
wait) <-
Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> n (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
n ())
forall a (m :: * -> *).
MonadIO m =>
Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
m ())
parallelCore Handler IO a
hndl Int
threads IO ()
notifyDrop ThreadBracket
threadBracket a -> IO ()
forall (f :: * -> *). MonadIO f => a -> f ()
process
let queueAndYield :: a -> m (Maybe b)
queueAndYield :: a -> m (Maybe b)
queueAndYield a
x = do
Maybe b
v <- Maybe (Maybe b) -> Maybe b
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Maybe (Maybe b) -> Maybe b) -> m (Maybe (Maybe b)) -> m (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Maybe (Maybe b)) -> m (Maybe (Maybe b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (BroadcastChan Out (Maybe b) -> IO (Maybe (Maybe b))
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out (Maybe b)
outChanOut IO (Maybe (Maybe b)) -> IO () -> IO (Maybe (Maybe b))
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* a -> IO ()
bufferValue a
x)
Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
v
finish :: r -> n r
finish :: r -> n r
finish r
r = do
Maybe (Maybe b)
next <- BroadcastChan Out (Maybe b) -> n (Maybe (Maybe b))
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out (Maybe b)
outChanOut
case Maybe (Maybe b)
next of
Maybe (Maybe b)
Nothing -> r -> n r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
Just Maybe b
Nothing -> r -> n r
finish r
r
Just (Just b
v) -> r -> b -> n r
foldFun r
r b
v n r -> (r -> n r) -> n r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= r -> n r
finish
action :: n r
action :: n r
action = do
r
result <- (a -> m ()) -> (a -> m (Maybe b)) -> n r
pipe (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (a -> IO ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO ()
bufferValue) a -> m (Maybe b)
queueAndYield
n ()
wait
BroadcastChan In (Maybe b) -> n Bool
forall (m :: * -> *) a. MonadIO m => BroadcastChan In a -> m Bool
closeBChan BroadcastChan In (Maybe b)
outChanIn
r -> n r
finish r
result
BracketOnError n r -> n (BracketOnError n r)
forall (m :: * -> *) a. Monad m => a -> m a
return Bracket :: forall (m :: * -> *) r.
IO [Weak ThreadId]
-> ([Weak ThreadId] -> IO ()) -> m r -> BracketOnError m r
Bracket{IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate,[Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup,n r
action :: n r
action :: n r
action}
where
foldFun :: r -> b -> n r
foldFun = case Either (b -> n r) (r -> b -> n r)
yielder of
Left b -> n r
g -> (b -> n r) -> r -> b -> n r
forall a b. a -> b -> a
const b -> n r
g
Right r -> b -> n r
g -> r -> b -> n r
g
runParallel_
:: (MonadIO m, MonadIO n)
=> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallel_ :: Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallel_ = ThreadBracket
-> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
forall (m :: * -> *) (n :: * -> *) a r.
(MonadIO m, MonadIO n) =>
ThreadBracket
-> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallelWith_ ThreadBracket
noopBracket
runParallelWith_
:: (MonadIO m, MonadIO n)
=> ThreadBracket
-> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallelWith_ :: ThreadBracket
-> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallelWith_ ThreadBracket
threadBracket Handler IO a
hndl Int
threads a -> IO ()
workFun (a -> m ()) -> n r
processElems = do
QSem
sem <- IO QSem -> n QSem
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QSem -> n QSem) -> IO QSem -> n QSem
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem Int
threads
let process :: a -> IO ()
process a
x = QSem -> IO ()
signalQSem QSem
sem IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO ()
workFun a
x
(IO [Weak ThreadId]
allocate, [Weak ThreadId] -> IO ()
cleanup, a -> IO ()
bufferValue, n ()
wait) <-
Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> n (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
n ())
forall a (m :: * -> *).
MonadIO m =>
Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
m ())
parallelCore Handler IO a
hndl Int
threads (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ThreadBracket
threadBracket a -> IO ()
process
let action :: n r
action = do
r
result <- (a -> m ()) -> n r
processElems ((a -> m ()) -> n r) -> (a -> m ()) -> n r
forall a b. (a -> b) -> a -> b
$ \a
v -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
QSem -> IO ()
waitQSem QSem
sem
a -> IO ()
bufferValue a
v
n ()
wait
r -> n r
forall (m :: * -> *) a. Monad m => a -> m a
return r
result
BracketOnError n r -> n (BracketOnError n r)
forall (m :: * -> *) a. Monad m => a -> m a
return Bracket :: forall (m :: * -> *) r.
IO [Weak ThreadId]
-> ([Weak ThreadId] -> IO ()) -> m r -> BracketOnError m r
Bracket{IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate,[Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup,n r
action :: n r
action :: n r
action}