{-# LANGUAGE CPP #-}
#include "inline.hs"
module Streamly.Internal.Data.Stream.Combinators
( maxThreads
, maxBuffer
, maxYields
, rate
, avgRate
, minRate
, maxRate
, constRate
, inspectMode
, printState
)
where
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Int (Int64)
import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.StreamK
import Streamly.Internal.Data.Stream.Serial (SerialT)
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads :: Int -> t m a -> t m a
maxThreads Int
n t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxThreads Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer :: Int -> t m a -> t m a
maxBuffer Int
n t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxBuffer Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL rate #-}
rate :: IsStream t => Maybe Rate -> t m a -> t m a
rate :: Maybe Rate -> t m a -> t m a
rate Maybe Rate
r t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
case Maybe Rate
r of
Just (Rate Double
low Double
goal Double
_ Int
_) | Double
goal Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
low ->
[Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be lower than minimum rate."
Just (Rate Double
_ Double
goal Double
high Int
_) | Double
goal Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
high ->
[Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be greater than maximum rate."
Just (Rate Double
low Double
_ Double
high Int
_) | Double
low Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
high ->
[Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Minimum rate cannot be greater than maximum rate."
Maybe Rate
_ -> State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Maybe Rate -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
avgRate :: IsStream t => Double -> t m a -> t m a
avgRate :: Double -> t m a -> t m a
avgRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
minRate :: IsStream t => Double -> t m a -> t m a
minRate :: Double -> t m a -> t m a
minRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
maxRate :: IsStream t => Double -> t m a -> t m a
maxRate :: Double -> t m a -> t m a
maxRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r Int
forall a. Bounded a => a
maxBound)
constRate :: IsStream t => Double -> t m a -> t m a
constRate :: Double -> t m a -> t m a
constRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)
{-# INLINE_NORMAL _serialLatency #-}
_serialLatency :: IsStream t => Int -> t m a -> t m a
_serialLatency :: Int -> t m a -> t m a
_serialLatency Int
n t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setStreamLatency Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
maxYields :: Maybe Int64 -> t m a -> t m a
maxYields Maybe Int64
n t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Maybe Int64 -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial Maybe Int64
_ = SerialT m a -> SerialT m a
forall a. a -> a
id
printState :: MonadIO m => State Stream m a -> m ()
printState :: State Stream m a -> m ()
printState State Stream m a
st = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let msv :: Maybe (SVar Stream m a)
msv = State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
case Maybe (SVar Stream m a)
msv of
Just SVar Stream m a
sv -> SVar Stream m a -> IO [Char]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO [Char]
dumpSVar SVar Stream m a
sv IO [Char] -> ([Char] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [Char] -> IO ()
putStrLn
Maybe (SVar Stream m a)
Nothing -> [Char] -> IO ()
putStrLn [Char]
"No SVar"
inspectMode :: IsStream t => t m a -> t m a
inspectMode :: t m a -> t m a
inspectMode t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> State t m a
setInspectMode State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m