{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.Parallel
(
ParallelT(..)
, Parallel
, consM
, parallelK
, parallelFstK
, parallelMinK
, mkParallelD
, mkParallelK
, tapAsyncK
, tapAsyncF
, newCallbackStream
)
where
import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map)
import qualified Data.Set as Set
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import Streamly.Internal.Data.SVar
#include "inline.hs"
#include "Instances.hs"
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne :: State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo =
case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> Stream m a -> m ()
go Stream m a
m0
Just Count
_ -> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo
where
go :: Stream m a -> m ()
go Stream m a
m = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
State Stream m a
-> (a -> Stream m a -> m ())
-> (a -> m ())
-> m ()
-> Stream m a
-> m ()
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m ()
yieldk a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
single m ()
stop Stream m a
m
sv :: SVar Stream m a
sv = Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> Stream m a -> m ()
yieldk a
a Stream m a
r = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stream m a -> m ()
go Stream m a
r
runOneLimited
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited :: State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo = Stream m a -> m ()
go Stream m a
m0
where
go :: Stream m a -> m ()
go Stream m a
m = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
State Stream m a
-> (a -> Stream m a -> m ())
-> (a -> m ())
-> m ()
-> Stream m a
-> m ()
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m ()
yieldk a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
single m ()
stop Stream m a
m
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar Stream m a
sv
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sv :: SVar Stream m a
sv = Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> Stream m a -> m ()
yieldk a
a Stream m a
r = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stream m a -> m ()
go Stream m a
r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
=> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarPar :: SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarPar SVarStopStyle
ss Stream m a
m Stream m a
r = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State Stream m a
st
SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} Stream m a
m)
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Set ThreadId
set <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar Stream m a
sv)
IORef ThreadId -> ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVar Stream m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar Stream m a
sv) (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Set ThreadId -> ThreadId
forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
SVarStopStyle
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} Stream m a
r)
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SVar Stream m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
SVar.fromSVar SVar Stream m a
sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
=> SVarStyle -> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar :: SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss Stream m a
m1 Stream m a
m2 = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
Just SVar Stream m a
sv | SVar Stream m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& SVar Stream m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar Stream m a
sv SVarStopStyle -> SVarStopStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st Stream m a
m1)
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m2
Maybe (SVar Stream m a)
_ ->
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarPar SVarStopStyle
ss Stream m a
m1 Stream m a
m2)
{-# INLINE parallelK #-}
parallelK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelK :: Stream m a -> Stream m a -> Stream m a
parallelK = SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consM :: m a -> ParallelT m a -> ParallelT m a
consM m a
m (ParallelT Stream m a
r) = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m a -> ParallelT m a) -> Stream m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect m a
m) Stream m a
r
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelFstK :: Stream m a -> Stream m a -> Stream m a
parallelFstK = SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelMinK :: Stream m a -> Stream m a -> Stream m a
parallelMinK = SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny
mkParallelK :: MonadAsync m => Stream m a -> Stream m a
mkParallelK :: Stream m a -> Stream m a
mkParallelK Stream m a
m = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st)
State Stream m a -> SVar Stream m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State Stream m a
st SVar Stream m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m a
m
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m a -> Stream m a) -> SerialT m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
SVar.fromSVar SVar Stream m a
sv
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: Stream m a -> Stream m a
mkParallelD Stream m a
m = (State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
where
step :: State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State Stream m a
gst Maybe (Stream m a)
Nothing = do
SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Stream m a
gst
State Stream m a -> SVar Stream m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State Stream m a
gst SVar Stream m a
sv Stream m a
m
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Stream m a
sv
step State Stream m a
gst (Just (D.UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Skip s
s -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Step s a
Stop -> Step (Maybe (Stream m a)) a
forall s a. Step s a
Stop
{-# INLINE tapAsyncK #-}
tapAsyncK :: MonadAsync m => (Stream m a -> m b) -> Stream m a -> Stream m a
tapAsyncK :: (Stream m a -> m b) -> Stream m a -> Stream m a
tapAsyncK Stream m a -> m b
f Stream m a
m = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- State Stream m a -> (SerialT m a -> m b) -> m (SVar Stream m a)
forall (m :: * -> *) a b.
MonadAsync m =>
State Stream m a -> (SerialT m a -> m b) -> m (SVar Stream m a)
SVar.newFoldSVar State Stream m a
st (Stream m a -> m b
f (Stream m a -> m b)
-> (SerialT m a -> Stream m a) -> SerialT m a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT)
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp
(Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SVar Stream m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a -> SerialT m a
SVar.teeToSVar SVar Stream m a
sv (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT Stream m a
m)
data TapState fs st a = TapInit | Tapping !fs st | TapDone st
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = (State Stream m a
-> TapState (SVar Stream m a) s Any
-> m (Step (TapState (SVar Stream m a) s Any) a))
-> TapState (SVar Stream m a) s Any -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> TapState (SVar Stream m a) s Any
-> m (Step (TapState (SVar Stream m a) s Any) a)
forall a a.
State Stream m a
-> TapState (SVar Stream m a) s a
-> m (Step (TapState (SVar Stream m a) s a) a)
step TapState (SVar Stream m a) s Any
forall fs st a. TapState fs st a
TapInit
where
drainFold :: SVar Stream m a -> m ()
drainFold SVar Stream m a
svr = do
Bool
done <- SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
SVar.fromConsumer SVar Stream m a
svr
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
SVar Stream m a -> m ()
drainFold SVar Stream m a
svr
stopFold :: SVar Stream m a -> m ()
stopFold SVar Stream m a
svr = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
SVar Stream m a -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
drainFold SVar Stream m a
svr
{-# INLINE_LATE step #-}
step :: State Stream m a
-> TapState (SVar Stream m a) s a
-> m (Step (TapState (SVar Stream m a) s a) a)
step State Stream m a
gst TapState (SVar Stream m a) s a
TapInit = do
SVar Stream m a
sv <- State Stream m a -> Fold m a b -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State Stream m a
gst Fold m a b
f
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. s -> Step s a
Skip (SVar Stream m a -> s -> TapState (SVar Stream m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
state1)
step State Stream m a
gst (Tapping SVar Stream m a
sv s
st) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
case Step s a
r of
Yield a
a s
s -> do
Bool
done <- SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
SVar.pushToFold SVar Stream m a
sv a
a
if Bool
done
then do
SVar Stream m a -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
stopFold SVar Stream m a
sv
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar Stream m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
else Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar Stream m a -> s -> TapState (SVar Stream m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
s)
Skip s
s -> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. s -> Step s a
Skip (SVar Stream m a -> s -> TapState (SVar Stream m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
s)
Step s a
Stop -> do
SVar Stream m a -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
stopFold SVar Stream m a
sv
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapState (SVar Stream m a) s a) a
forall s a. Step s a
Stop
step State Stream m a
gst (TapDone s
st) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a
-> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar Stream m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Skip s
s -> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. s -> Step s a
Skip (s -> TapState (SVar Stream m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Step s a
Stop -> Step (TapState (SVar Stream m a) s a) a
forall s a. Step s a
Stop
newtype ParallelT m a = ParallelT {ParallelT m a -> Stream m a
getParallelT :: Stream m a}
deriving (m a -> ParallelT m a
(forall (m :: * -> *) a. Monad m => m a -> ParallelT m a)
-> MonadTrans ParallelT
forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> ParallelT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
MonadTrans)
type Parallel = ParallelT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
append :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
append :: ParallelT m a -> ParallelT m a -> ParallelT m a
append (ParallelT Stream m a
m1) (ParallelT Stream m a
m2) = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m a -> ParallelT m a) -> Stream m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (ParallelT m a) where
<> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = ParallelT m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append
instance MonadAsync m => Monoid (ParallelT m a) where
mempty :: ParallelT m a
mempty = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT Stream m a
forall (m :: * -> *) a. Stream m a
K.nil
mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = ParallelT m a -> ParallelT m a -> ParallelT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel ::
ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT Stream m (a -> b)
m1) (ParallelT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> ParallelT m b
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m b -> ParallelT m b) -> Stream m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
{-# INLINE pure #-}
pure :: a -> ParallelT m a
pure = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m a -> ParallelT m a)
-> (a -> Stream m a) -> a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> Stream m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel
{-# INLINE bind #-}
{-# SPECIALIZE bind ::
ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bind :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind (ParallelT Stream m a
m) a -> ParallelT m b
f = Stream m b -> ParallelT m b
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m b -> ParallelT m b) -> Stream m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK Stream m a
m (ParallelT m b -> Stream m b
forall (m :: * -> *) a. ParallelT m a -> Stream m a
getParallelT (ParallelT m b -> Stream m b)
-> (a -> ParallelT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ParallelT m b
f)
instance MonadAsync m => Monad (ParallelT m) where
return :: a -> ParallelT m a
return = a -> ParallelT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind
MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream :: m (a -> m (), Stream m a)
newCallbackStream = do
SVar Any m a
sv <- SVarStopStyle -> State Any m a -> m (SVar Any m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Any m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState
IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Any m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv
let callback :: a -> m ()
callback a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Any m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
(a -> m (), Stream m a) -> m (a -> m (), Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
callback, Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK (SVar Any m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))