{-# OPTIONS_GHC -Wno-orphans #-}
module Streamly.Internal.Data.Stream.IsStream.Common
(
fromPure
, fromEffect
, repeatM
, timesWith
, absTimesWith
, relTimesWith
, foldOn
, fold
, fold_
, map
, scanlMAfter'
, postscanlM'
, smapM
, take
, takeWhile
, takeEndBy
, drop
, findIndices
, intersperseM
, interjectSuffix
, reverse
, reverse'
, mkAsync
, mkParallel
, parallelFst
, concatM
, concatMapM
, concatMap
, splitOnSeq
, zipWithM
, zipWith
, yield
, yieldM
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Storable (Storable)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Array.Foreign.Type (Array)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Stream.IsStream.Combinators (maxYields)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamD, toStreamD, fromStreamS, toStreamS)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64, addToAbsTime64)
import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Stream.Async as Async
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import Streamly.Internal.System.IO (defaultChunkSize)
import Prelude hiding (take, takeWhile, drop, reverse, concatMap, map, zipWith)
{-# INLINE fromPure #-}
fromPure :: IsStream t => a -> t m a
fromPure :: a -> t m a
fromPure = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> Stream m a
K.fromPure
{-# DEPRECATED yield "Please use fromPure instead." #-}
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
yield :: a -> t m a
yield = a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure
{-# INLINE fromEffect #-}
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: m a -> t m a
fromEffect = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> (m a -> Stream m a) -> m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect
{-# DEPRECATED yieldM "Please use fromEffect instead." #-}
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM :: m a -> t m a
yieldM = m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect
{-# INLINE_EARLY repeatM #-}
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM :: m a -> t m a
repeatM = (m a -> t m a -> t m a) -> m a -> t m a
forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
(m a -> t m a -> t m a) -> m a -> t m a
K.repeatMWith m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
IsStream.consM
{-# RULES "repeatM serial" repeatM = repeatMSerial #-}
{-# INLINE repeatMSerial #-}
repeatMSerial :: MonadAsync m => m a -> SerialT m a
repeatMSerial :: m a -> SerialT m a
repeatMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
S.repeatM
{-# INLINE timesWith #-}
timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
timesWith :: Double -> t m (AbsTime, RelTime64)
timesWith Double
g = Stream m (AbsTime, RelTime64) -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (AbsTime, RelTime64) -> t m (AbsTime, RelTime64))
-> Stream m (AbsTime, RelTime64) -> t m (AbsTime, RelTime64)
forall a b. (a -> b) -> a -> b
$ Double -> Stream m (AbsTime, RelTime64)
forall (m :: * -> *).
MonadIO m =>
Double -> Stream m (AbsTime, RelTime64)
D.times Double
g
{-# INLINE absTimesWith #-}
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
absTimesWith :: Double -> t m AbsTime
absTimesWith = ((AbsTime, RelTime64) -> AbsTime)
-> t m (AbsTime, RelTime64) -> t m AbsTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((AbsTime -> RelTime64 -> AbsTime)
-> (AbsTime, RelTime64) -> AbsTime
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry AbsTime -> RelTime64 -> AbsTime
addToAbsTime64) (t m (AbsTime, RelTime64) -> t m AbsTime)
-> (Double -> t m (AbsTime, RelTime64)) -> Double -> t m AbsTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith
{-# INLINE relTimesWith #-}
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m RelTime64
relTimesWith :: Double -> t m RelTime64
relTimesWith = ((AbsTime, RelTime64) -> RelTime64)
-> t m (AbsTime, RelTime64) -> t m RelTime64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AbsTime, RelTime64) -> RelTime64
forall a b. (a, b) -> b
snd (t m (AbsTime, RelTime64) -> t m RelTime64)
-> (Double -> t m (AbsTime, RelTime64)) -> Double -> t m RelTime64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith
{-# INLINE foldOn #-}
foldOn :: Monad m => Fold m a b -> SerialT m a -> Fold m a b
foldOn :: Fold m a b -> SerialT m a -> Fold m a b
foldOn Fold m a b
f SerialT m a
s = Fold m a b -> Stream m a -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Fold m a b
D.foldOn Fold m a b
f (Stream m a -> Fold m a b) -> Stream m a -> Fold m a b
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD SerialT m a
s
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> SerialT m a -> m b
fold :: Fold m a b -> SerialT m a -> m b
fold Fold m a b
fl SerialT m a
strm = do
(b
b, SerialT m a
_) <- Fold m a b -> SerialT m a -> m (b, SerialT m a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m (b, SerialT m a)
fold_ Fold m a b
fl SerialT m a
strm
b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> b -> m b
forall a b. (a -> b) -> a -> b
$! b
b
{-# INLINE fold_ #-}
fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a)
fold_ :: Fold m a b -> SerialT m a -> m (b, SerialT m a)
fold_ Fold m a b
fl SerialT m a
strm = do
(b
b, Stream m a
str) <- Fold m a b -> Stream m a -> m (b, Stream m a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (b, Stream m a)
D.fold_ Fold m a b
fl (Stream m a -> m (b, Stream m a))
-> Stream m a -> m (b, Stream m a)
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD SerialT m a
strm
(b, SerialT m a) -> m (b, SerialT m a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, SerialT m a) -> m (b, SerialT m a))
-> (b, SerialT m a) -> m (b, SerialT m a)
forall a b. (a -> b) -> a -> b
$! (b
b, Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD Stream m a
str)
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map :: (a -> b) -> t m a -> t m b
map a -> b
f = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map a -> b
f (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE scanlMAfter' #-}
scanlMAfter' :: (IsStream t, Monad m)
=> (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' :: (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' b -> a -> m b
step m b
initial b -> m b
done t m a
stream =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
D.scanlMAfter' b -> a -> m b
step m b
initial b -> m b
done (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
stream
{-# INLINE postscanlM' #-}
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
postscanlM' :: (b -> a -> m b) -> m b -> t m a -> t m b
postscanlM' b -> a -> m b
step m b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.postscanlM' b -> a -> m b
step m b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE smapM #-}
smapM :: (IsStream t, Monad m) =>
(s -> a -> m (s, b))
-> m s
-> t m a
-> t m b
smapM :: (s -> a -> m (s, b)) -> m s -> t m a -> t m b
smapM s -> a -> m (s, b)
step m s
initial t m a
stream =
let r :: t m (s, b)
r = ((s, b) -> a -> m (s, b)) -> m (s, b) -> t m a -> t m (s, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
postscanlM'
(\(s
s, b
_) a
a -> s -> a -> m (s, b)
step s
s a
a)
((s -> (s, b)) -> m s -> m (s, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,b
forall a. HasCallStack => a
undefined) m s
initial)
t m a
stream
in ((s, b) -> b) -> t m (s, b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map (s, b) -> b
forall a b. (a, b) -> b
snd t m (s, b)
r
{-# INLINE take #-}
take :: (IsStream t, Monad m) => Int -> t m a -> t m a
take :: Int -> t m a -> t m a
take Int
n t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Applicative m =>
Int -> Stream m a -> Stream m a
S.take Int
n (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
(Maybe Int64 -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Int64 -> t m a -> t m a
maxYields (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)) t m a
m)
{-# INLINE takeWhile #-}
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeWhile :: (a -> Bool) -> t m a -> t m a
takeWhile a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.takeWhile a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE takeEndBy #-}
takeEndBy :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeEndBy :: (a -> Bool) -> t m a -> t m a
takeEndBy a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.takeEndBy a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE drop #-}
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
drop :: Int -> t m a -> t m a
drop Int
n t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
S.drop Int
n (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE findIndices #-}
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
findIndices :: (a -> Bool) -> t m a -> t m Int
findIndices a -> Bool
p t m a
m = Stream m Int -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m Int -> t m Int) -> Stream m Int -> t m Int
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m Int
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m Int
S.findIndices a -> Bool
p (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE intersperseM #-}
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseM :: m a -> t m a -> t m a
intersperseM m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
S.intersperseM m a
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE interjectSuffix #-}
interjectSuffix
:: (IsStream t, MonadAsync m)
=> Double -> m a -> t m a -> t m a
interjectSuffix :: Double -> m a -> t m a -> t m a
interjectSuffix Double
n m a
f t m a
xs = t m a
xs t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallelFst` m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM m a
timed
where timed :: m a
timed = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)) m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
f
{-# INLINE reverse #-}
reverse :: (IsStream t, Monad m) => t m a -> t m a
reverse :: t m a -> t m a
reverse t m a
s = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
S.reverse (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
s
{-# INLINE reverse' #-}
reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
reverse' :: t m a -> t m a
reverse' =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD
(Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m a
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev
(Stream m (Array a) -> Stream m a)
-> (t m a -> Stream m (Array a)) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK
(Stream m (Array a) -> Stream m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a. Stream m a -> Stream m a
K.reverse
(Stream m (Array a) -> Stream m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK
(Stream m (Array a) -> Stream m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.arraysOf Int
defaultChunkSize
(Stream m a -> Stream m (Array a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE_NORMAL mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
mkAsync :: t m a -> t m a
mkAsync = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Async.mkAsyncD (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
mkParallel :: t m a -> t m a
mkParallel = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Par.mkParallelD (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE parallelFst #-}
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallelFst :: t m a -> t m a -> t m a
parallelFst t m a
m1 t m a
m2 = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
Par.parallelFstK (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2)
{-# INLINE concatMapM #-}
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
concatMapM :: (a -> m (t m b)) -> t m a -> t m b
concatMapM a -> m (t m b)
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> m (Stream m b)) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
D.concatMapM ((t m b -> Stream m b) -> m (t m b) -> m (Stream m b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (m (t m b) -> m (Stream m b))
-> (a -> m (t m b)) -> a -> m (Stream m b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m (t m b)
f) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatMap #-}
concatMap ::(IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
concatMap :: (a -> t m b) -> t m a -> t m b
concatMap a -> t m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
D.concatMap (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (t m b -> Stream m b) -> (a -> t m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
f) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatM #-}
concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
concatM :: m (t m a) -> t m a
concatM m (t m a)
generator = (() -> m (t m a)) -> t m () -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
concatMapM (\() -> m (t m a)
generator) (() -> t m ()
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure ())
{-# INLINE splitOnSeq #-}
splitOnSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSeq :: Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSeq Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
m)
{-# INLINE zipWithM #-}
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM :: (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM a -> b -> m c
f t m a
m1 t m b
m2 =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamS
(Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
S.zipWithM a -> b -> m c
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m a
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m b
m2)
{-# INLINE zipWith #-}
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith :: (a -> b -> c) -> t m a -> t m b -> t m c
zipWith a -> b -> c
f t m a
m1 t m b
m2 =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamS
(Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
S.zipWith a -> b -> c
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m a
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m b
m2)