{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}
module Streamly.Streams.Async
(
AsyncT
, Async
, asyncly
, async
, (<|)
, mkAsync
, mkAsync'
, WAsyncT
, WAsync
, wAsyncly
, wAsync
)
where
import Control.Concurrent (myThreadId)
import Control.Monad (ap)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
import Prelude hiding (map)
import qualified Data.Set as S
import Streamly.Atomics (atomicModifyIORefCAS)
import Streamly.Streams.SVar (fromSVar)
import Streamly.Streams.Serial (map)
import Streamly.SVar
import Streamly.Streams.StreamK
(IsStream(..), Stream, mkStream, foldStream, adapt, foldStreamShared,
foldStreamSVar)
import qualified Streamly.Streams.StreamK as K
#include "Instances.hs"
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadIO m
=> IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO q st sv winfo = run
where
run = do
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> foldStreamSVar sv st yieldk single run m
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
then foldStreamSVar sv st yieldk single run r
else liftIO $ do
enqueueLIFO sv q r
sendStop sv winfo
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: MonadIO m
=> IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited q st sv winfo = run
where
run = do
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> do
yieldLimitOk <- liftIO $ decrementYieldLimit sv
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
foldStreamSVar sv st yieldk single stop m
else liftIO $ do
enqueueLIFO sv q m
incrementYieldLimit sv
sendStop sv winfo
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
then foldStreamSVar sv st yieldk single stop r
else liftIO $ do
incrementYieldLimit sv
enqueueLIFO sv q r
sendStop sv winfo
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadIO m
=> LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO q st sv winfo = run
where
run = do
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> foldStreamSVar sv st yieldk single run m
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
then foldStreamSVar sv st yieldk single run r
else liftIO $ do
enqueueFIFO sv q r
sendStop sv winfo
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: MonadIO m
=> LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited q st sv winfo = run
where
run = do
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> do
yieldLimitOk <- liftIO $ decrementYieldLimit sv
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
foldStreamSVar sv st yieldk single stop m
else liftIO $ do
enqueueFIFO sv q m
incrementYieldLimit sv
sendStop sv winfo
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
then foldStreamSVar sv st yieldk single stop r
else liftIO $ do
incrementYieldLimit sv
enqueueFIFO sv q r
sendStop sv winfo
getLifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef []
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
stats <- newSVarStats
tid <- myThreadId
let isWorkFinished _ = null <$> readIORef q
let isWorkFinishedLimited sv = do
yieldsDone <-
case remainingWork sv of
Just ref -> do
n <- readIORef ref
return (n <= 0)
Nothing -> return False
qEmpty <- null <$> readIORef q
return $ qEmpty || yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar sv readOutput postProc workDone wloop = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxWorkerLimit = getMaxThreads st
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutput sv
, postProcess = postProc sv
, workerThreads = running
, workLoop = wloop q st{streamVar = Just sv} sv
, enqueue = enqueueLIFO sv q
, isWorkDone = workDone sv
, isQueueDone = workDone sv
, needDoorBell = wfw
, svarStyle = AsyncVar
, svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = undefined
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
, svarStats = stats
}
let sv =
case getStreamRate st of
Nothing ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinished
workLoopLIFO
Just _ -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinishedLimited
workLoopLIFOLimited
Just _ ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinished
workLoopLIFO
Just _ -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinishedLimited
workLoopLIFOLimited
in return sv
getFifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newQ
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
stats <- newSVarStats
tid <- myThreadId
let isWorkFinished _ = nullQ q
let isWorkFinishedLimited sv = do
yieldsDone <-
case remainingWork sv of
Just ref -> do
n <- readIORef ref
return (n <= 0)
Nothing -> return False
qEmpty <- nullQ q
return $ qEmpty || yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar sv readOutput postProc workDone wloop = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxWorkerLimit = getMaxThreads st
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutput sv
, postProcess = postProc sv
, workerThreads = running
, workLoop = wloop q st{streamVar = Just sv} sv
, enqueue = enqueueFIFO sv q
, isWorkDone = workDone sv
, isQueueDone = workDone sv
, needDoorBell = wfw
, svarStyle = WAsyncVar
, svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = undefined
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
, svarStats = stats
}
let sv =
case getStreamRate st of
Nothing ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinished
workLoopFIFO
Just _ -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinishedLimited
workLoopFIFOLimited
Just _ ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinished
workLoopFIFO
Just _ -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinishedLimited
workLoopFIFOLimited
in return sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar st m = do
mrun <- captureMonadState
sv <- liftIO $ getLifoSVar st mrun
sendFirstWorker sv m
{-# INLINABLE mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkAsync m = fmap fromSVar (newAsyncVar defState (toStream m))
{-# INLINABLE mkAsync' #-}
mkAsync' :: (IsStream t, MonadAsync m) => State Stream m a -> t m a -> m (t m a)
mkAsync' st m = fmap fromSVar (newAsyncVar st (toStream m))
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar st m = do
mrun <- captureMonadState
sv <- liftIO $ getFifoSVar st mrun
sendFirstWorker sv m
forkSVarAsync :: (IsStream t, MonadAsync m)
=> SVarStyle -> t m a -> t m a -> t m a
forkSVarAsync style m1 m2 = mkStream $ \st stp sng yld -> do
sv <- case style of
AsyncVar -> newAsyncVar st (concurrently (toStream m1) (toStream m2))
WAsyncVar -> newWAsyncVar st (concurrently (toStream m1) (toStream m2))
_ -> error "illegal svar type"
foldStream st stp sng yld $ fromSVar sv
where
concurrently ma mb = mkStream $ \st stp sng yld -> do
liftIO $ enqueue (fromJust $ streamVar st) mb
foldStreamShared st stp sng yld ma
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: (IsStream t, MonadAsync m)
=> SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync style m1 m2 = mkStream $ \st stp sng yld ->
case streamVar st of
Just sv | svarStyle sv == style -> do
liftIO $ enqueue sv (toStream m2)
foldStreamShared st stp sng yld m1
_ -> foldStreamShared st stp sng yld (forkSVarAsync style m1 m2)
{-# INLINE async #-}
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
async = joinStreamVarAsync AsyncVar
{-# DEPRECATED (<|) "Please use 'async' instead." #-}
{-# INLINE (<|) #-}
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
(<|) = async
{-# INLINE consMAsync #-}
{-# SPECIALIZE consMAsync :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
consMAsync m r = fromStream $ K.yieldM m `async` (toStream r)
newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a}
deriving (MonadTrans)
type Async = AsyncT IO
asyncly :: IsStream t => AsyncT m a -> t m a
asyncly = adapt
instance IsStream AsyncT where
toStream = getAsyncT
fromStream = AsyncT
consM = consMAsync
(|:) = consMAsync
{-# INLINE mappendAsync #-}
{-# SPECIALIZE mappendAsync :: AsyncT IO a -> AsyncT IO a -> AsyncT IO a #-}
mappendAsync :: MonadAsync m => AsyncT m a -> AsyncT m a -> AsyncT m a
mappendAsync m1 m2 = fromStream $ async (toStream m1) (toStream m2)
instance MonadAsync m => Semigroup (AsyncT m a) where
(<>) = mappendAsync
instance MonadAsync m => Monoid (AsyncT m a) where
mempty = K.nil
mappend = (<>)
{-# INLINE bindAsync #-}
{-# SPECIALIZE bindAsync :: AsyncT IO a -> (a -> AsyncT IO b) -> AsyncT IO b #-}
bindAsync :: MonadAsync m => AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync m f = fromStream $ K.bindWith async (adapt m) (\a -> adapt $ f a)
instance MonadAsync m => Monad (AsyncT m) where
return = pure
(>>=) = bindAsync
{-# INLINE apAsync #-}
{-# SPECIALIZE apAsync :: AsyncT IO (a -> b) -> AsyncT IO a -> AsyncT IO b #-}
apAsync :: MonadAsync m => AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync mf m = ap (adapt mf) (adapt m)
instance (Monad m, MonadAsync m) => Applicative (AsyncT m) where
pure = AsyncT . K.yield
(<*>) = apAsync
MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
{-# INLINE consMWAsync #-}
{-# SPECIALIZE consMWAsync :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
consMWAsync m r = fromStream $ K.yieldM m `wAsync` (toStream r)
{-# INLINE wAsync #-}
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
wAsync = joinStreamVarAsync WAsyncVar
newtype WAsyncT m a = WAsyncT {getWAsyncT :: Stream m a}
deriving (MonadTrans)
type WAsync = WAsyncT IO
wAsyncly :: IsStream t => WAsyncT m a -> t m a
wAsyncly = adapt
instance IsStream WAsyncT where
toStream = getWAsyncT
fromStream = WAsyncT
consM = consMWAsync
(|:) = consMWAsync
{-# INLINE mappendWAsync #-}
{-# SPECIALIZE mappendWAsync :: WAsyncT IO a -> WAsyncT IO a -> WAsyncT IO a #-}
mappendWAsync :: MonadAsync m => WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappendWAsync m1 m2 = fromStream $ wAsync (toStream m1) (toStream m2)
instance MonadAsync m => Semigroup (WAsyncT m a) where
(<>) = mappendWAsync
instance MonadAsync m => Monoid (WAsyncT m a) where
mempty = K.nil
mappend = (<>)
{-# INLINE bindWAsync #-}
{-# SPECIALIZE bindWAsync :: WAsyncT IO a -> (a -> WAsyncT IO b) -> WAsyncT IO b #-}
bindWAsync :: MonadAsync m => WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync m f = fromStream $ K.bindWith wAsync (adapt m) (\a -> adapt $ f a)
instance MonadAsync m => Monad (WAsyncT m) where
return = pure
(>>=) = bindWAsync
{-# INLINE apWAsync #-}
{-# SPECIALIZE apWAsync :: WAsyncT IO (a -> b) -> WAsyncT IO a -> WAsyncT IO b #-}
apWAsync :: MonadAsync m => WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync mf m = ap (adapt mf) (adapt m)
instance (Monad m, MonadAsync m) => Applicative (WAsyncT m) where
pure = WAsyncT . K.yield
(<*>) = apWAsync
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)