{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Type
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.IsStream.Type
    (
    -- * IsStream Type Class
      IsStream (..)
    , K.Stream (..)
    , Streaming

    -- * Type Conversion
    , fromStreamS
    , toStreamS
    , fromStreamD
    , toStreamD
    , adapt
    , toConsK

    -- * Building a stream
    , mkStream
    , foldStreamShared
    , foldStream

    -- * Stream Types
    , SerialT
    , Serial
    , fromSerial

    , WSerialT
    , WSerial
    , fromWSerial

    , AsyncT
    , Async
    , fromAsync

    , WAsyncT
    , WAsync
    , fromWAsync

    , AheadT
    , Ahead
    , fromAhead

    , ParallelT
    , Parallel
    , fromParallel

    , ZipSerialM
    , ZipSerial
    , fromZipSerial

    , ZipAsyncM
    , ZipAsync
    , fromZipAsync

    -- * Construction
    , cons
    , (.:)
    , nil
    , nilM
    , fromPure
    , fromEffect
    , repeat

    -- * Bind/Concat
    , bindWith
    , concatMapWith

    -- * Fold Utilities
    , concatFoldableWith
    , concatMapFoldableWith
    , concatForFoldableWith

    -- * Running Effects
    , drain

    -- * Conversion operations
    , fromList
    , toList

    -- * Fold operations
    , foldrM
    , foldrMx
    , foldr

    , foldlx'
    , foldlMx'
    , foldl'
    , fold

    -- * Zip style operations
    , eqBy
    , cmpBy

    -- * Deprecated
    , interleaving
    , zipping
    , zippingAsync
    )
where

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Stream.Serial
    (SerialT(..), Serial, WSerialT(..), WSerial)
import Streamly.Internal.Data.Stream.Async
    (AsyncT(..), Async, WAsyncT(..), WAsync)
import Streamly.Internal.Data.Stream.Ahead (AheadT(..), Ahead)
import Streamly.Internal.Data.Stream.Parallel (ParallelT(..), Parallel)
import Streamly.Internal.Data.Stream.Zip (ZipSerialM(..), ZipSerial)
import Streamly.Internal.Data.Stream.ZipAsync (ZipAsyncM(..), ZipAsync)
import Streamly.Internal.Data.SVar.Type (State, adaptState)

import qualified Prelude
import qualified Streamly.Internal.Data.Stream.Ahead as Ahead
import qualified Streamly.Internal.Data.Stream.Async as Async
import qualified Streamly.Internal.Data.Stream.Parallel as Parallel
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
import qualified Streamly.Internal.Data.Stream.StreamK.Type as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD.Type as S
#endif
import qualified Streamly.Internal.Data.Stream.Zip as Zip
import qualified Streamly.Internal.Data.Stream.ZipAsync as ZipAsync

import Prelude hiding (foldr, repeat)

#include "inline.hs"
#define USE_IS_STREAM
#include "PreludeCommon.hs"

------------------------------------------------------------------------------
-- Types that can behave as a Stream
------------------------------------------------------------------------------

infixr 5 `consM`
infixr 5 |:

-- XXX Use a different SVar based on the stream type. But we need to make sure
-- that we do not lose performance due to polymorphism.
--
-- | Class of types that can represent a stream of elements of some type 'a' in
-- some monad 'm'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
class
#if __GLASGOW_HASKELL__ >= 806
    ( forall m a. MonadAsync m => Semigroup (t m a)
    , forall m a. MonadAsync m => Monoid (t m a)
    , forall m. Monad m => Functor (t m)
    , forall m. MonadAsync m => Applicative (t m)
    ) =>
#endif
      IsStream t where
    toStream :: t m a -> K.Stream m a
    fromStream :: K.Stream m a -> t m a
    -- | Constructs a stream by adding a monadic action at the head of an
    -- existing stream. For example:
    --
    -- @
    -- > toList $ getLine \`consM` getLine \`consM` nil
    -- hello
    -- world
    -- ["hello","world"]
    -- @
    --
    -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
    --
    -- @since 0.2.0
    consM :: MonadAsync m => m a -> t m a -> t m a
    -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@"
    -- to remember that @|@ comes before ':'.
    --
    -- @
    -- > toList $ getLine |: getLine |: nil
    -- hello
    -- world
    -- ["hello","world"]
    -- @
    --
    -- @
    -- let delay = threadDelay 1000000 >> print 1
    -- drain $ fromSerial  $ delay |: delay |: delay |: nil
    -- drain $ fromParallel $ delay |: delay |: delay |: nil
    -- @
    --
    -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
    --
    -- @since 0.2.0
    (|:) :: MonadAsync m => m a -> t m a -> t m a
    -- We can define (|:) just as 'consM' but it is defined explicitly for each
    -- type because we want to use SPECIALIZE pragma on the definition.

-- | Same as 'IsStream'.
--
-- @since 0.1.0
{-# DEPRECATED Streaming "Please use IsStream instead." #-}
type Streaming = IsStream

-------------------------------------------------------------------------------
-- Type adapting combinators
-------------------------------------------------------------------------------

-- XXX Move/reset the State here by reconstructing the stream with cleared
-- state. Can we make sure we do not do that when t1 = t2? If we do this then
-- we do not need to do that explicitly using svarStyle.  It would act as
-- unShare when the stream type is the same.
--
-- | Adapt any specific stream type to any other specific stream type.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
adapt :: t1 m a -> t2 m a
adapt = Stream m a -> t2 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t2 m a)
-> (t1 m a -> Stream m a) -> t1 m a -> t2 m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t1 m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream

{-# INLINE fromStreamD #-}
fromStreamD :: (IS_STREAM Monad m) => D.Stream m a -> STREAM m a
fromStreamD :: Stream m a -> t m a
fromStreamD = FROM_STREAM . D.toStreamK

-- | Adapt a polymorphic consM operation to a StreamK cons operation
{-# INLINE toConsK #-}
toConsK :: IsStream t =>
    (m a -> t m a -> t m a) -> m a -> K.Stream m a -> K.Stream m a
toConsK :: (m a -> t m a -> t m a) -> m a -> Stream m a -> Stream m a
toConsK m a -> t m a -> t m a
cns m a
x Stream m a
xs = t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream (t m a -> Stream m a) -> t m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ m a
x m a -> t m a -> t m a
`cns` Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m a
xs

------------------------------------------------------------------------------
-- Building a stream
------------------------------------------------------------------------------

-- XXX The State is always parameterized by "Stream" which means State is not
-- different for different stream types. So we have to manually make sure that
-- when converting from one stream to another we migrate the state correctly.
-- This can be fixed if we use a different SVar type for different streams.
-- Currently we always use "SVar Stream" and therefore a different State type
-- parameterized by that stream.
--
-- XXX Since t is coercible we should be able to coerce k
-- mkStream k = fromStream $ MkStream $ coerce k
--
-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
-- continuation and a yield continuation.
{-# INLINE_EARLY mkStream #-}
mkStream :: IsStream t
    => (forall r. State K.Stream m a
        -> (a -> t m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> t m a
mkStream :: (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall r.
State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
k = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
    let yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> Stream m a -> m r
yld a
a (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
r)
     in State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
k State Stream m a
st a -> t m a -> m r
forall (t :: (* -> *) -> * -> *). IsStream t => a -> t m a -> m r
yieldk a -> m r
sng m r
stp

{-# RULES "mkStream from stream" mkStream = mkStreamFromStream #-}
mkStreamFromStream :: IsStream t
    => (forall r. State K.Stream m a
        -> (a -> K.Stream m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> t m a
mkStreamFromStream :: (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStreamFromStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k

{-# RULES "mkStream stream" mkStream = mkStreamStream #-}
mkStreamStream
    :: (forall r. State K.Stream m a
        -> (a -> K.Stream m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> K.Stream m a
mkStreamStream :: (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
mkStreamStream = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream

------------------------------------------------------------------------------
-- Folding a stream
------------------------------------------------------------------------------

-- | Fold a stream by providing an SVar, a stop continuation, a singleton
-- continuation and a yield continuation. The stream would share the current
-- SVar passed via the State.
{-# INLINE_EARLY foldStreamShared #-}
foldStreamShared
    :: IsStream t
    => State K.Stream m a
    -> (a -> t m a -> m r)
    -> (a -> m r)
    -> m r
    -> t m a
    -> m r
foldStreamShared :: State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m =
    let yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
x = a -> t m a -> m r
yld a
a (Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m a
x)
        K.MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m
     in State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k State Stream m a
st a -> Stream m a -> m r
yieldk a -> m r
sng m r
stp

-- XXX write a similar rule for foldStream as well?
{-# RULES "foldStreamShared from stream"
   foldStreamShared = foldStreamSharedStream #-}
foldStreamSharedStream
    :: State K.Stream m a
    -> (a -> K.Stream m a -> m r)
    -> (a -> m r)
    -> m r
    -> K.Stream m a
    -> m r
foldStreamSharedStream :: State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
foldStreamSharedStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m =
    let K.MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = Stream m a
m
     in State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp

-- | Fold a stream by providing a State, stop continuation, a singleton
-- continuation and a yield continuation. The stream will not use the SVar
-- passed via State.
{-# INLINE foldStream #-}
foldStream
    :: IsStream t
    => State K.Stream m a
    -> (a -> t m a -> m r)
    -> (a -> m r)
    -> m r
    -> t m a
    -> m r
foldStream :: State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m =
    let yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
x = a -> t m a -> m r
yld a
a (Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m a
x)
        K.MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m
     in State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) a -> Stream m a -> m r
yieldk a -> m r
sng m r
stp

-------------------------------------------------------------------------------
-- Serial
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'SerialT'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
fromSerial :: IsStream t => SerialT m a -> t m a
fromSerial :: SerialT m a -> t m a
fromSerial = SerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream SerialT where
    toStream :: SerialT m a -> Stream m a
toStream = SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT
    fromStream :: Stream m a -> SerialT m a
fromStream = Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> SerialT IO a -> SerialT IO a #-}
    consM :: m a -> SerialT m a -> SerialT m a
consM = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
Serial.consM

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> SerialT IO a -> SerialT IO a #-}
    |: :: m a -> SerialT m a -> SerialT m a
(|:) = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
Serial.consM

-- | Fix the type of a polymorphic stream as 'WSerialT'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromWSerial :: IsStream t => WSerialT m a -> t m a
fromWSerial :: WSerialT m a -> t m a
fromWSerial = WSerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

-- | Same as 'fromWSerial'.
--
-- @since 0.1.0
{-# DEPRECATED interleaving "Please use fromWSerial instead." #-}
interleaving :: IsStream t => WSerialT m a -> t m a
interleaving :: WSerialT m a -> t m a
interleaving = WSerialT m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
WSerialT m a -> t m a
fromWSerial

instance IsStream WSerialT where
    toStream :: WSerialT m a -> Stream m a
toStream = WSerialT m a -> Stream m a
forall (m :: * -> *) a. WSerialT m a -> Stream m a
getWSerialT
    fromStream :: Stream m a -> WSerialT m a
fromStream = Stream m a -> WSerialT m a
forall (m :: * -> *) a. Stream m a -> WSerialT m a
WSerialT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    consM :: Monad m => m a -> WSerialT m a -> WSerialT m a
    consM :: m a -> WSerialT m a -> WSerialT m a
consM = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
Serial.consMWSerial

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    (|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a
    |: :: m a -> WSerialT m a -> WSerialT m a
(|:) = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
Serial.consMWSerial

-------------------------------------------------------------------------------
-- Async
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'AsyncT'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
fromAsync :: IsStream t => AsyncT m a -> t m a
fromAsync :: AsyncT m a -> t m a
fromAsync = AsyncT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream AsyncT where
    toStream :: AsyncT m a -> Stream m a
toStream = AsyncT m a -> Stream m a
forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT
    fromStream :: Stream m a -> AsyncT m a
fromStream = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> AsyncT IO a -> AsyncT IO a #-}
    consM :: m a -> AsyncT m a -> AsyncT m a
consM = m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
Async.consMAsync

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> AsyncT IO a -> AsyncT IO a #-}
    |: :: m a -> AsyncT m a -> AsyncT m a
(|:) = m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
Async.consMAsync

-- | Fix the type of a polymorphic stream as 'WAsyncT'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromWAsync :: IsStream t => WAsyncT m a -> t m a
fromWAsync :: WAsyncT m a -> t m a
fromWAsync = WAsyncT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream WAsyncT where
    toStream :: WAsyncT m a -> Stream m a
toStream = WAsyncT m a -> Stream m a
forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT
    fromStream :: Stream m a -> WAsyncT m a
fromStream = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
    consM :: m a -> WAsyncT m a -> WAsyncT m a
consM = m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
Async.consMWAsync

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
    |: :: m a -> WAsyncT m a -> WAsyncT m a
(|:) = m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
Async.consMWAsync

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'AheadT'.
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
fromAhead :: IsStream t => AheadT m a -> t m a
fromAhead :: AheadT m a -> t m a
fromAhead = AheadT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream AheadT where
    toStream :: AheadT m a -> Stream m a
toStream = AheadT m a -> Stream m a
forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT
    fromStream :: Stream m a -> AheadT m a
fromStream = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
    consM :: m a -> AheadT m a -> AheadT m a
consM = m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
Ahead.consM

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> AheadT IO a -> AheadT IO a #-}
    |: :: m a -> AheadT m a -> AheadT m a
(|:) = m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
Ahead.consM

-------------------------------------------------------------------------------
-- Parallel
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'ParallelT'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
fromParallel :: IsStream t => ParallelT m a -> t m a
fromParallel :: ParallelT m a -> t m a
fromParallel = ParallelT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream ParallelT where
    toStream :: ParallelT m a -> Stream m a
toStream = ParallelT m a -> Stream m a
forall (m :: * -> *) a. ParallelT m a -> Stream m a
getParallelT
    fromStream :: Stream m a -> ParallelT m a
fromStream = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
    consM :: m a -> ParallelT m a -> ParallelT m a
consM = m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
Parallel.consM

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> ParallelT IO a -> ParallelT IO a #-}
    |: :: m a -> ParallelT m a -> ParallelT m a
(|:) = m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
Parallel.consM

-------------------------------------------------------------------------------
-- Zip
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'ZipSerialM'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromZipSerial :: IsStream t => ZipSerialM m a -> t m a
fromZipSerial :: ZipSerialM m a -> t m a
fromZipSerial = ZipSerialM m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

-- | Same as 'fromZipSerial'.
--
-- @since 0.1.0
{-# DEPRECATED zipping "Please use fromZipSerial instead." #-}
zipping :: IsStream t => ZipSerialM m a -> t m a
zipping :: ZipSerialM m a -> t m a
zipping = ZipSerialM m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
ZipSerialM m a -> t m a
fromZipSerial

instance IsStream ZipSerialM where
    toStream :: ZipSerialM m a -> Stream m a
toStream = ZipSerialM m a -> Stream m a
forall (m :: * -> *) a. ZipSerialM m a -> Stream m a
getZipSerialM
    fromStream :: Stream m a -> ZipSerialM m a
fromStream = Stream m a -> ZipSerialM m a
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
    consM :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
    consM :: m a -> ZipSerialM m a -> ZipSerialM m a
consM = m a -> ZipSerialM m a -> ZipSerialM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
Zip.consMZip

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
    (|:) :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
    |: :: m a -> ZipSerialM m a -> ZipSerialM m a
(|:) = m a -> ZipSerialM m a -> ZipSerialM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
Zip.consMZip

-- | Fix the type of a polymorphic stream as 'ZipAsyncM'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a
fromZipAsync :: ZipAsyncM m a -> t m a
fromZipAsync = ZipAsyncM m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

-- | Same as 'fromZipAsync'.
--
-- @since 0.1.0
{-# DEPRECATED zippingAsync "Please use fromZipAsync instead." #-}
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
zippingAsync :: ZipAsyncM m a -> t m a
zippingAsync = ZipAsyncM m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
ZipAsyncM m a -> t m a
fromZipAsync

instance IsStream ZipAsyncM where
    toStream :: ZipAsyncM m a -> Stream m a
toStream = ZipAsyncM m a -> Stream m a
forall (m :: * -> *) a. ZipAsyncM m a -> Stream m a
getZipAsyncM
    fromStream :: Stream m a -> ZipAsyncM m a
fromStream = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
    consM :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
    consM :: m a -> ZipAsyncM m a -> ZipAsyncM m a
consM = m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
ZipAsync.consMZipAsync

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
    (|:) :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
    |: :: m a -> ZipAsyncM m a -> ZipAsyncM m a
(|:) = m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
ZipAsync.consMZipAsync

-------------------------------------------------------------------------------
-- Construction
-------------------------------------------------------------------------------

infixr 5 `cons`

-- | Construct a stream by adding a pure value at the head of an existing
-- stream. For serial streams this is the same as @(return a) \`consM` r@ but
-- more efficient. For concurrent streams this is not concurrent whereas
-- 'consM' is concurrent. For example:
--
-- @
-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil
-- [1,2,3]
-- @
--
-- @since 0.1.0
{-# INLINE_NORMAL cons #-}
cons :: IsStream t => a -> t m a -> t m a
cons :: a -> t m a -> t m a
cons a
a t m a
r = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> Stream m a
forall a (m :: * -> *). a -> Stream m a -> Stream m a
K.cons a
a (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
r)

infixr 5 .:

-- | Operator equivalent of 'cons'.
--
-- @
-- > toList $ 1 .: 2 .: 3 .: nil
-- [1,2,3]
-- @
--
-- @since 0.1.1
{-# INLINE (.:) #-}
(.:) :: IsStream t => a -> t m a -> t m a
.: :: a -> t m a -> t m a
(.:) = a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
cons

{-# INLINE_NORMAL nil #-}
nil :: IsStream t => t m a
nil :: t m a
nil = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m a
forall (m :: * -> *) a. Stream m a
K.nil

{-# INLINE_NORMAL nilM #-}
nilM :: (IsStream t, Monad m) => m b -> t m a
nilM :: m b -> t m a
nilM = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> (m b -> Stream m a) -> m b -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
K.nilM

{-# INLINE_NORMAL fromPure #-}
fromPure :: IsStream t => a -> t m a
fromPure :: a -> t m a
fromPure = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> Stream m a
K.fromPure

{-# INLINE_NORMAL fromEffect #-}
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: m a -> t m a
fromEffect = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> (m a -> Stream m a) -> m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect

{-# INLINE repeat #-}
repeat :: IsStream t => a -> t m a
repeat :: a -> t m a
repeat = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> Stream m a
K.repeat

-------------------------------------------------------------------------------
-- Bind/Concat
-------------------------------------------------------------------------------

{-# INLINE bindWith #-}
bindWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> t m a
    -> (a -> t m b)
    -> t m b
bindWith :: (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
bindWith t m b -> t m b -> t m b
par t m a
m1 a -> t m b
f =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream
        (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
K.bindWith
            (\Stream m b
s1 Stream m b
s2 -> t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream (t m b -> Stream m b) -> t m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m b -> t m b -> t m b
par (Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m b
s1) (Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m b
s2))
            (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1)
            (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream (t m b -> Stream m b) -> (a -> t m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
f)

-- | @concatMapWith mixer generator stream@ is a two dimensional looping
-- combinator.  The @generator@ function is used to generate streams from the
-- elements in the input @stream@ and the @mixer@ function is used to merge
-- those streams.
--
-- Note we can merge streams concurrently by using a concurrent merge function.
--
-- /Since: 0.7.0/
--
-- /Since: 0.8.0 (signature change)/
{-# INLINE concatMapWith #-}
concatMapWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> (a -> t m b)
    -> t m a
    -> t m b
concatMapWith :: (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith t m b -> t m b -> t m b
par a -> t m b
f t m a
xs = (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
bindWith t m b -> t m b -> t m b
par t m a
xs a -> t m b
f

-- | A variant of 'foldMap' that allows you to map a monadic streaming action
-- on a 'Foldable' container and then fold it using the specified stream merge
-- operation.
--
-- @concatMapFoldableWith 'async' return [1..3]@
--
-- Equivalent to:
--
-- @
-- concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
-- concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
-- @
--
-- /Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)/
--
-- /Since: 0.1.0 ("Streamly")/
{-# INLINE concatMapFoldableWith #-}
concatMapFoldableWith :: (IsStream t, Foldable f)
    => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith :: (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith t m b -> t m b -> t m b
f a -> t m b
g = (a -> t m b -> t m b) -> t m b -> f a -> t m b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr (t m b -> t m b -> t m b
f (t m b -> t m b -> t m b) -> (a -> t m b) -> a -> t m b -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
g) t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil

-- | Like 'concatMapFoldableWith' but with the last two arguments reversed i.e. the
-- monadic streaming function is the last argument.
--
-- Equivalent to:
--
-- @
-- concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs
-- concatForFoldableWith f = flip (S.concatMapFoldableWith f)
-- @
--
-- /Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)/
--
-- /Since: 0.1.0 ("Streamly")/
{-# INLINE concatForFoldableWith #-}
concatForFoldableWith :: (IsStream t, Foldable f)
    => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
concatForFoldableWith :: (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
concatForFoldableWith t m b -> t m b -> t m b
f = ((a -> t m b) -> f a -> t m b) -> f a -> (a -> t m b) -> t m b
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
forall (t :: (* -> *) -> * -> *) (f :: * -> *) (m :: * -> *) b a.
(IsStream t, Foldable f) =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith t m b -> t m b -> t m b
f)

-- | A variant of 'Data.Foldable.fold' that allows you to fold a 'Foldable'
-- container of streams using the specified stream sum operation.
--
-- @concatFoldableWith 'async' $ map return [1..3]@
--
-- Equivalent to:
--
-- @
-- concatFoldableWith f = Prelude.foldr f S.nil
-- concatFoldableWith f = S.concatMapFoldableWith f id
-- @
--
-- /Since: 0.8.0 (Renamed foldWith to concatFoldableWith)/
--
-- /Since: 0.1.0 ("Streamly")/
{-# INLINE concatFoldableWith #-}
concatFoldableWith :: (IsStream t, Foldable f)
    => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
concatFoldableWith :: (t m a -> t m a -> t m a) -> f (t m a) -> t m a
concatFoldableWith t m a -> t m a -> t m a
f = (t m a -> t m a -> t m a) -> (t m a -> t m a) -> f (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (f :: * -> *) (m :: * -> *) b a.
(IsStream t, Foldable f) =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith t m a -> t m a -> t m a
f t m a -> t m a
forall a. a -> a
id