{-# LANGUAGE CPP                       #-}

#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Data.Stream.Combinators
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.Combinators
    ( maxThreads
    , maxBuffer
    , maxYields
    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate
    , inspectMode
    , printState
    )
where

import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Int (Int64)

import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.StreamK
import Streamly.Internal.Data.Stream.Serial (SerialT)

-------------------------------------------------------------------------------
-- Concurrency control
-------------------------------------------------------------------------------
--
-- XXX need to write these in direct style otherwise they will break fusion.
--
-- | Specify the maximum number of threads that can be spawned concurrently for
-- any concurrent combinator in a stream.
-- A value of 0 resets the thread limit to default, a negative value means
-- there is no limit. The default value is 1500. 'maxThreads' does not affect
-- 'ParallelT' streams as they can use unbounded number of threads.
--
-- When the actions in a stream are IO bound, having blocking IO calls, this
-- option can be used to control the maximum number of in-flight IO requests.
-- When the actions are CPU bound this option can be used to
-- control the amount of CPU used by the stream.
--
-- @since 0.4.0
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads :: Int -> t m a -> t m a
maxThreads Int
n t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxThreads Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m

{-
{-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-}
maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
maxThreadsSerial _ = id
-}

-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
-- presence of infinite streams, or very large streams.  Especially, it must
-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
-- applicative zip streams generates an infinite stream causing unbounded
-- concurrent generation with no limit on the buffer or threads.
--
-- @since 0.4.0
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer :: Int -> t m a -> t m a
maxBuffer Int
n t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxBuffer Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m

{-
{-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-}
maxBufferSerial :: Int -> SerialT m a -> SerialT m a
maxBufferSerial _ = id
-}

-- | Specify the pull rate of a stream.
-- A 'Nothing' value resets the rate to default which is unlimited.  When the
-- rate is specified, concurrent production may be ramped up or down
-- automatically to achieve the specified yield rate. The specific behavior for
-- different styles of 'Rate' specifications is documented under 'Rate'.  The
-- effective maximum production rate achieved by a stream is governed by:
--
-- * The 'maxThreads' limit
-- * The 'maxBuffer' limit
-- * The maximum rate that the stream producer can achieve
-- * The maximum rate that the stream consumer can achieve
--
-- @since 0.5.0
{-# INLINE_NORMAL rate #-}
rate :: IsStream t => Maybe Rate -> t m a -> t m a
rate :: Maybe Rate -> t m a -> t m a
rate Maybe Rate
r t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
    case Maybe Rate
r of
        Just (Rate Double
low Double
goal Double
_ Int
_) | Double
goal Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
low ->
            [Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be lower than minimum rate."
        Just (Rate Double
_ Double
goal Double
high Int
_) | Double
goal Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
high ->
            [Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be greater than maximum rate."
        Just (Rate Double
low Double
_ Double
high Int
_) | Double
low Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
high ->
            [Char] -> m r
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Minimum rate cannot be greater than maximum rate."
        Maybe Rate
_ -> State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Maybe Rate -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m

-- XXX implement for serial streams as well, as a simple delay

{-
{-# RULES "rate serial" rate = yieldRateSerial #-}
yieldRateSerial :: Double -> SerialT m a -> SerialT m a
yieldRateSerial _ = id
-}

-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@
--
-- Specifies the average production rate of a stream in number of yields
-- per second (i.e.  @Hertz@).  Concurrent production is ramped up or down
-- automatically to achieve the specified average yield rate. The rate can
-- go down to half of the specified rate on the lower side and double of
-- the specified rate on the higher side.
--
-- @since 0.5.0
avgRate :: IsStream t => Double -> t m a -> t m a
avgRate :: Double -> t m a -> t m a
avgRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@
--
-- Specifies the minimum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go below the
-- specified rate, even though it may possibly go above it at times, the
-- upper limit is double of the specified rate.
--
-- @since 0.5.0
minRate :: IsStream t => Double -> t m a -> t m a
minRate :: Double -> t m a -> t m a
minRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@
--
-- Specifies the maximum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go above the
-- specified rate, even though it may possibly go below it at times, the
-- lower limit is half of the specified rate. This can be useful in
-- applications where certain resource usage must not be allowed to go
-- beyond certain limits.
--
-- @since 0.5.0
maxRate :: IsStream t => Double -> t m a -> t m a
maxRate :: Double -> t m a -> t m a
maxRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r Int
forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate r r r 0)@
--
-- Specifies a constant yield rate. If for some reason the actual rate
-- goes above or below the specified rate we do not try to recover it by
-- increasing or decreasing the rate in future.  This can be useful in
-- applications like graphics frame refresh where we need to maintain a
-- constant refresh rate.
--
-- @since 0.5.0
constRate :: IsStream t => Double -> t m a -> t m a
constRate :: Double -> t m a -> t m a
constRate Double
r = Maybe Rate -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)

-- | Specify the average latency, in nanoseconds, of a single threaded action
-- in a concurrent composition. Streamly can measure the latencies, but that is
-- possible only after at least one task has completed. This combinator can be
-- used to provide a latency hint so that rate control using 'rate' can take
-- that into account right from the beginning. When not specified then a
-- default behavior is chosen which could be too slow or too fast, and would be
-- restricted by any other control parameters configured.
-- A value of 0 indicates default behavior, a negative value means there is no
-- limit i.e. zero latency.
-- This would normally be useful only in high latency and high throughput
-- cases.
--
{-# INLINE_NORMAL _serialLatency #-}
_serialLatency :: IsStream t => Int -> t m a -> t m a
_serialLatency :: Int -> t m a -> t m a
_serialLatency Int
n t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Int -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setStreamLatency Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m

{-
{-# RULES "serialLatency serial" _serialLatency = serialLatencySerial #-}
serialLatencySerial :: Int -> SerialT m a -> SerialT m a
serialLatencySerial _ = id
-}

-- Stop concurrent dispatches after this limit. This is useful in API's like
-- "take" where we want to dispatch only upto the number of elements "take"
-- needs.  This value applies only to the immediate next level and is not
-- inherited by everything in enclosed scope.
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
maxYields :: Maybe Int64 -> t m a -> t m a
maxYields Maybe Int64
n t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (Maybe Int64 -> State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m

{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial Maybe Int64
_ = SerialT m a -> SerialT m a
forall a. a -> a
id

printState :: MonadIO m => State Stream m a -> m ()
printState :: State Stream m a -> m ()
printState State Stream m a
st = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    let msv :: Maybe (SVar Stream m a)
msv = State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
    case Maybe (SVar Stream m a)
msv of
        Just SVar Stream m a
sv -> SVar Stream m a -> IO [Char]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO [Char]
dumpSVar SVar Stream m a
sv IO [Char] -> ([Char] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [Char] -> IO ()
putStrLn
        Maybe (SVar Stream m a)
Nothing -> [Char] -> IO ()
putStrLn [Char]
"No SVar"

-- | Print debug information about an SVar when the stream ends
--
-- /Internal/
--
inspectMode :: IsStream t => t m a -> t m a
inspectMode :: t m a -> t m a
inspectMode t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
     State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> State t m a
setInspectMode State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m