{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE FlexibleContexts #-}
#if __GLASGOW_HASKELL__ >= 800
{-# OPTIONS_GHC -Wno-orphans #-}
#endif
#include "inline.hs"
module Streamly.Internal.Prelude
(
K.nil
, K.nilM
, K.cons
, (K..:)
, consM
, (|:)
, yield
, yieldM
, repeat
, repeatM
, replicate
, replicateM
, Enumerable (..)
, enumerate
, enumerateTo
, unfoldr
, unfoldrM
, unfold
, iterate
, iterateM
, fromIndices
, fromIndicesM
, P.fromList
, fromListM
, K.fromFoldable
, fromFoldableM
, fromPrimVar
, fromCallback
, currentTime
, uncons
, tail
, init
, foldrM
, foldrS
, foldrT
, foldr
, foldl'
, foldl1'
, foldlM'
, fold
, parse
, foldAsync
, (|$.)
, (|&.)
, drain
, last
, length
, sum
, product
, mconcat
, maximumBy
, maximum
, minimumBy
, minimum
, the
, toList
, toListRev
, toPure
, toPureRev
, toStream
, toStreamRev
, drainN
, drainWhile
, (!!)
, head
, headElse
, findM
, find
, lookup
, findIndex
, elemIndex
, null
, elem
, notElem
, all
, any
, and
, or
, eqBy
, cmpBy
, isPrefixOf
, isSuffixOf
, isInfixOf
, isSubsequenceOf
, stripPrefix
, stripSuffix
, dropPrefix
, dropInfix
, dropSuffix
, transform
, Serial.map
, sequence
, mapM
, mapM_
, trace
, tap
, tapOffsetEvery
, tapAsync
, tapRate
, pollCounts
, scanl'
, scanlM'
, postscanl'
, postscanlM'
, prescanl'
, prescanlM'
, scanl1'
, scanl1M'
, scan
, postscan
, D.mkParallel
, applyAsync
, (|$)
, (|&)
, filter
, filterM
, mapMaybe
, mapMaybeM
, deleteBy
, uniq
, insertBy
, intersperseM
, intersperse
, intersperseSuffix
, intersperseSuffixBySpan
, interjectSuffix
, delayPost
, indexed
, indexedR
, reverse
, reverse'
, splitParse
, take
, takeByTime
, takeWhile
, takeWhileM
, drop
, dropByTime
, dropWhile
, dropWhileM
, chunksOf
, chunksOf2
, arraysOf
, intervalsOf
, findIndices
, elemIndices
, splitOn
, splitOnSuffix
, splitWithSuffix
, wordsBy
, splitOnSeq
, splitOnSuffixSeq
, splitBySeq
, splitWithSuffixSeq
, splitInnerBy
, splitInnerBySuffix
, groups
, groupsBy
, groupsByRolling
, rollingMapM
, rollingMap
, classifySessionsBy
, classifySessionsOf
, classifyKeepAliveSessions
, append
, interleave
, interleaveMin
, interleaveSuffix
, interleaveInfix
, Serial.wSerialFst
, Serial.wSerialMin
, roundrobin
, Par.parallelFst
, Par.parallelMin
, mergeBy
, mergeByM
, mergeAsyncBy
, mergeAsyncByM
, Z.zipWith
, Z.zipWithM
, Z.zipAsyncWith
, Z.zipAsyncWithM
, foldWith
, foldMapWith
, forEachWith
, concat
, concatM
, concatMap
, concatMapM
, concatMapWith
, concatUnfold
, concatUnfoldInterleave
, concatUnfoldRoundrobin
, concatMapIterateWith
, concatMapTreeWith
, concatMapLoopWith
, concatMapTreeYieldLeavesWith
, K.mfix
, gintercalate
, gintercalateSuffix
, intercalate
, intercalateSuffix
, interpose
, interposeSuffix
, before
, after
, afterIO
, bracket
, bracketIO
, onException
, finally
, finallyIO
, handle
, hoist
, generally
, liftInner
, usingReaderT
, runReaderT
, evalStateT
, usingStateT
, runStateT
, inspectMode
, K.once
, each
, scanx
, foldx
, foldxM
, foldr1
, runStream
, runN
, runWhile
, fromHandle
, toHandle
)
where
import Control.Concurrent (threadDelay)
import Control.Exception (Exception, assert)
import Control.Monad (void)
import Control.Monad.Catch (MonadCatch, MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader (ReaderT)
import Control.Monad.State.Strict (StateT)
import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Functor.Identity (Identity (..))
#if __GLASGOW_HASKELL__ >= 800
import Data.Kind (Type)
#endif
import Data.Heap (Entry(..))
import Data.Maybe (isJust, fromJust, isNothing)
import Foreign.Storable (Storable)
import Prelude
hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem,
notElem, maximum, minimum, head, last, tail, length, null,
reverse, iterate, init, and, or, lookup, foldr1, (!!),
scanl, scanl1, replicate, concatMap, span, splitAt, break,
repeat, concat, mconcat)
import qualified Data.Heap as H
import qualified Data.Map.Strict as Map
import qualified Prelude
import qualified System.IO as IO
import Streamly.Internal.Data.Stream.Enumeration (Enumerable(..), enumerate, enumerateTo)
import Streamly.Internal.Data.Fold.Types (Fold (..), Fold2 (..))
import Streamly.Internal.Data.Parser.Types (Parser (..))
import Streamly.Internal.Data.Unfold.Types (Unfold)
import Streamly.Internal.Memory.Array.Types (Array, writeNUnsafe)
import Streamly.Internal.Data.SVar (MonadAsync, defState)
import Streamly.Internal.Data.Stream.Combinators (inspectMode, maxYields)
import Streamly.Internal.Data.Stream.Prelude
(fromStreamS, toStreamS, foldWith, foldMapWith, forEachWith)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream((|:), consM))
import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT)
import Streamly.Internal.Data.Stream.Zip (ZipSerialM)
import Streamly.Internal.Data.Pipe.Types (Pipe (..))
import Streamly.Internal.Data.Time.Units
(AbsTime, MilliSecond64(..), addToAbsTime, toRelTime,
toAbsTime, TimeUnit64)
import Streamly.Internal.Mutable.Prim.Var (Prim, Var)
import Streamly.Internal.Data.Strict
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Zip as Z
{-# INLINE uncons #-}
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
uncons :: SerialT m a -> m (Maybe (a, t m a))
uncons SerialT m a
m = t m a -> m (Maybe (a, t m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> m (Maybe (a, t m a))
K.uncons (SerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt SerialT m a
m)
{-# INLINE_EARLY unfoldr #-}
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
unfoldr :: (b -> Maybe (a, b)) -> b -> t m a
unfoldr b -> Maybe (a, b)
step b
seed = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS ((b -> Maybe (a, b)) -> b -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
(s -> Maybe (a, s)) -> s -> Stream m a
S.unfoldr b -> Maybe (a, b)
step b
seed)
{-# RULES "unfoldr fallback to StreamK" [1]
forall a b. S.toStreamK (S.unfoldr a b) = K.unfoldr a b #-}
{-# INLINE_EARLY unfoldrM #-}
unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM :: (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM = (b -> m (Maybe (a, b))) -> b -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
K.unfoldrM
{-# RULES "unfoldrM serial" unfoldrM = unfoldrMSerial #-}
{-# INLINE_EARLY unfoldrMSerial #-}
unfoldrMSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrMSerial :: (b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrMSerial = (b -> m (Maybe (a, b))) -> b -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
Serial.unfoldrM
{-# RULES "unfoldrM wSerial" unfoldrM = unfoldrMWSerial #-}
{-# INLINE_EARLY unfoldrMWSerial #-}
unfoldrMWSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> WSerialT m a
unfoldrMWSerial :: (b -> m (Maybe (a, b))) -> b -> WSerialT m a
unfoldrMWSerial = (b -> m (Maybe (a, b))) -> b -> WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
Serial.unfoldrM
{-# RULES "unfoldrM zipSerial" unfoldrM = unfoldrMZipSerial #-}
{-# INLINE_EARLY unfoldrMZipSerial #-}
unfoldrMZipSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
unfoldrMZipSerial :: (b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
unfoldrMZipSerial = (b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
Serial.unfoldrM
{-# INLINE unfold #-}
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
unfold :: Unfold m a b -> a -> t m b
unfold Unfold m a b
unf a
x = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> a -> Stream m b
D.unfold Unfold m a b
unf a
x
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
yield :: a -> t m a
yield = a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM :: m a -> t m a
yieldM = m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.yieldM
{-# INLINE fromIndices #-}
fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a
fromIndices :: (Int -> a) -> t m a
fromIndices = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a)
-> ((Int -> a) -> Stream m a) -> (Int -> a) -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> a) -> Stream m a
forall (m :: * -> *) a. Monad m => (Int -> a) -> Stream m a
S.fromIndices
{-# INLINE_EARLY fromIndicesM #-}
fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a
fromIndicesM :: (Int -> m a) -> t m a
fromIndicesM = (Int -> m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(Int -> m a) -> t m a
K.fromIndicesM
{-# RULES "fromIndicesM serial" fromIndicesM = fromIndicesMSerial #-}
{-# INLINE fromIndicesMSerial #-}
fromIndicesMSerial :: MonadAsync m => (Int -> m a) -> SerialT m a
fromIndicesMSerial :: (Int -> m a) -> SerialT m a
fromIndicesMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> ((Int -> m a) -> Stream m a) -> (Int -> m a) -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> m a) -> Stream m a
forall (m :: * -> *) a. Monad m => (Int -> m a) -> Stream m a
S.fromIndicesM
{-# INLINE_EARLY replicateM #-}
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
replicateM :: Int -> m a -> t m a
replicateM = Int -> m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Int -> m a -> t m a
K.replicateM
{-# RULES "replicateM serial" replicateM = replicateMSerial #-}
{-# INLINE replicateMSerial #-}
replicateMSerial :: MonadAsync m => Int -> m a -> SerialT m a
replicateMSerial :: Int -> m a -> SerialT m a
replicateMSerial Int
n = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> m a -> Stream m a
S.replicateM Int
n
{-# INLINE_NORMAL replicate #-}
replicate :: (IsStream t, Monad m) => Int -> a -> t m a
replicate :: Int -> a -> t m a
replicate Int
n = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> a -> Stream m a
S.replicate Int
n
{-# INLINE_NORMAL repeat #-}
repeat :: (IsStream t, Monad m) => a -> t m a
repeat :: a -> t m a
repeat = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a
S.repeat
{-# INLINE_EARLY repeatM #-}
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM :: m a -> t m a
repeatM = m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
K.repeatM
{-# RULES "repeatM serial" repeatM = repeatMSerial #-}
{-# INLINE repeatMSerial #-}
repeatMSerial :: MonadAsync m => m a -> SerialT m a
repeatMSerial :: m a -> SerialT m a
repeatMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
S.repeatM
{-# INLINE_NORMAL iterate #-}
iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a
iterate :: (a -> a) -> a -> t m a
iterate a -> a
step = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a) -> a -> Stream m a
forall (m :: * -> *) a. Monad m => (a -> a) -> a -> Stream m a
S.iterate a -> a
step
{-# INLINE_EARLY iterateM #-}
iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a
iterateM :: (a -> m a) -> m a -> t m a
iterateM = (a -> m a) -> m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(a -> m a) -> m a -> t m a
K.iterateM
{-# RULES "iterateM serial" iterateM = iterateMSerial #-}
{-# INLINE iterateMSerial #-}
iterateMSerial :: MonadAsync m => (a -> m a) -> m a -> SerialT m a
iterateMSerial :: (a -> m a) -> m a -> SerialT m a
iterateMSerial a -> m a
step = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m a) -> m a -> Stream m a
forall (m :: * -> *) a. Monad m => (a -> m a) -> m a -> Stream m a
S.iterateM a -> m a
step
{-# INLINE_EARLY fromListM #-}
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
fromListM :: [m a] -> t m a
fromListM = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> ([m a] -> Stream m a) -> [m a] -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [m a] -> Stream m a
forall (m :: * -> *) a. MonadAsync m => [m a] -> Stream m a
D.fromListM
{-# RULES "fromListM fallback to StreamK" [1]
forall a. D.toStreamK (D.fromListM a) = fromFoldableM a #-}
{-# INLINE fromFoldableM #-}
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
fromFoldableM :: f (m a) -> t m a
fromFoldableM = (m a -> t m a -> t m a) -> t m a -> f (m a) -> t m a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
{-# DEPRECATED each "Please use fromFoldable instead." #-}
{-# INLINE each #-}
each :: (IsStream t, Foldable f) => f a -> t m a
each :: f a -> t m a
each = f a -> t m a
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
K.fromFoldable
{-# DEPRECATED fromHandle
"Please use Streamly.FileSystem.Handle module (see the changelog)" #-}
fromHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String
fromHandle :: Handle -> t m String
fromHandle Handle
h = t m String
go
where
go :: t m String
go = (forall r.
State Stream m String
-> (String -> t m String -> m r) -> (String -> m r) -> m r -> m r)
-> t m String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m String
-> (String -> t m String -> m r) -> (String -> m r) -> m r -> m r)
-> t m String)
-> (forall r.
State Stream m String
-> (String -> t m String -> m r) -> (String -> m r) -> m r -> m r)
-> t m String
forall a b. (a -> b) -> a -> b
$ \State Stream m String
_ String -> t m String -> m r
yld String -> m r
_ m r
stp -> do
Bool
eof <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Handle -> IO Bool
IO.hIsEOF Handle
h
if Bool
eof
then m r
stp
else do
String
str <- IO String -> m String
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO String -> m String) -> IO String -> m String
forall a b. (a -> b) -> a -> b
$ Handle -> IO String
IO.hGetLine Handle
h
String -> t m String -> m r
yld String
str t m String
go
{-# INLINE fromPrimVar #-}
fromPrimVar :: (IsStream t, MonadIO m, Prim a) => Var IO a -> t m a
fromPrimVar :: Var IO a -> t m a
fromPrimVar = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a)
-> (Var IO a -> Stream m a) -> Var IO a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Var IO a -> Stream m a
forall (m :: * -> *) a.
(MonadIO m, Prim a) =>
Var IO a -> Stream m a
D.fromPrimVar
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
fromCallback :: ((a -> m ()) -> m ()) -> SerialT m a
fromCallback (a -> m ()) -> m ()
setCallback = m (SerialT m a) -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM (m (SerialT m a) -> SerialT m a) -> m (SerialT m a) -> SerialT m a
forall a b. (a -> b) -> a -> b
$ do
(a -> m ()
callback, SerialT m a
stream) <- m (a -> m (), SerialT m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m (a -> m (), t m a)
D.newCallbackStream
(a -> m ()) -> m ()
setCallback a -> m ()
callback
SerialT m a -> m (SerialT m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SerialT m a
stream
{-# INLINE currentTime #-}
currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime
currentTime :: Double -> t m AbsTime
currentTime Double
g = Stream m AbsTime -> t m AbsTime
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m AbsTime -> t m AbsTime)
-> Stream m AbsTime -> t m AbsTime
forall a b. (a -> b) -> a -> b
$ Double -> Stream m AbsTime
forall (m :: * -> *). MonadAsync m => Double -> Stream m AbsTime
D.currentTime Double
g
{-# INLINE foldrM #-}
foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b
foldrM :: (a -> m b -> m b) -> m b -> SerialT m a -> m b
foldrM = (a -> m b -> m b) -> m b -> SerialT m a -> m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, IsStream t) =>
(a -> m b -> m b) -> m b -> t m a -> m b
P.foldrM
{-# INLINE foldrS #-}
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS :: (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS = (a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
K.foldrS
{-# INLINE foldrT #-}
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s)
=> (a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT :: (a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT a -> s m b -> s m b
f s m b
z t m a
s = (a -> s m b -> s m b) -> s m b -> Stream m a -> s m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, Monad (t m), MonadTrans t) =>
(a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
S.foldrT a -> s m b -> s m b
f s m b
z (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
s)
{-# INLINE foldr #-}
foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
foldr :: (a -> b -> b) -> b -> SerialT m a -> m b
foldr = (a -> b -> b) -> b -> SerialT m a -> m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, IsStream t) =>
(a -> b -> b) -> b -> t m a -> m b
P.foldr
{-# INLINE foldr1 #-}
{-# DEPRECATED foldr1 "Use foldrM instead." #-}
foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
foldr1 :: (a -> a -> a) -> SerialT m a -> m (Maybe a)
foldr1 a -> a -> a
f SerialT m a
m = (a -> a -> a) -> Stream m a -> m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> m (Maybe a)
S.foldr1 a -> a -> a
f (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# DEPRECATED foldx "Please use foldl' followed by fmap instead." #-}
{-# INLINE foldx #-}
foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
foldx :: (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
foldx = (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> m b
P.foldlx'
{-# INLINE foldl' #-}
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
foldl' :: (b -> a -> b) -> b -> SerialT m a -> m b
foldl' = (b -> a -> b) -> b -> SerialT m a -> m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b a.
(Monad m, IsStream t) =>
(b -> a -> b) -> b -> t m a -> m b
P.foldl'
{-# INLINE foldl1' #-}
foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
foldl1' :: (a -> a -> a) -> SerialT m a -> m (Maybe a)
foldl1' a -> a -> a
step SerialT m a
m = do
Maybe (a, SerialT m a)
r <- SerialT m a -> m (Maybe (a, SerialT m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
SerialT m a -> m (Maybe (a, t m a))
uncons SerialT m a
m
case Maybe (a, SerialT m a)
r of
Maybe (a, SerialT m a)
Nothing -> Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
Just (a
h, SerialT m a
t) -> do
a
res <- (a -> a -> a) -> a -> SerialT m a -> m a
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
foldl' a -> a -> a
step a
h SerialT m a
t
Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
res
{-# DEPRECATED foldxM "Please use foldlM' followed by fmap instead." #-}
{-# INLINE foldxM #-}
foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
foldxM :: (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
foldxM = (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
P.foldlMx'
{-# INLINE foldlM' #-}
foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b
foldlM' :: (b -> a -> m b) -> b -> SerialT m a -> m b
foldlM' b -> a -> m b
step b
begin SerialT m a
m = (b -> a -> m b) -> b -> Stream m a -> m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> Stream m a -> m b
S.foldlM' b -> a -> m b
step b
begin (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> SerialT m a -> m b
fold :: Fold m a b -> SerialT m a -> m b
fold = Fold m a b -> SerialT m a -> m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, IsStream t) =>
Fold m a b -> t m a -> m b
P.runFold
{-# INLINE parse #-}
parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b
parse :: Parser m a b -> SerialT m a -> m b
parse (Parser s -> a -> m (Step s b)
step m s
initial s -> m b
extract) = (s -> a -> m (Step s b)) -> m s -> (s -> m b) -> SerialT m a -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) s a b.
(IsStream t, MonadThrow m) =>
(s -> a -> m (Step s b)) -> m s -> (s -> m b) -> t m a -> m b
P.parselMx' s -> a -> m (Step s b)
step m s
initial s -> m b
extract
{-# INLINE drain #-}
drain :: Monad m => SerialT m a -> m ()
drain :: SerialT m a -> m ()
drain = SerialT m a -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> m ()
P.drain
{-# DEPRECATED runStream "Please use \"drain\" instead" #-}
{-# INLINE runStream #-}
runStream :: Monad m => SerialT m a -> m ()
runStream :: SerialT m a -> m ()
runStream = SerialT m a -> m ()
forall (m :: * -> *) a. Monad m => SerialT m a -> m ()
drain
{-# INLINE drainN #-}
drainN :: Monad m => Int -> SerialT m a -> m ()
drainN :: Int -> SerialT m a -> m ()
drainN Int
n = SerialT m a -> m ()
forall (m :: * -> *) a. Monad m => SerialT m a -> m ()
drain (SerialT m a -> m ())
-> (SerialT m a -> SerialT m a) -> SerialT m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m a
take Int
n
{-# DEPRECATED runN "Please use \"drainN\" instead" #-}
{-# INLINE runN #-}
runN :: Monad m => Int -> SerialT m a -> m ()
runN :: Int -> SerialT m a -> m ()
runN = Int -> SerialT m a -> m ()
forall (m :: * -> *) a. Monad m => Int -> SerialT m a -> m ()
drainN
{-# INLINE drainWhile #-}
drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
drainWhile :: (a -> Bool) -> SerialT m a -> m ()
drainWhile a -> Bool
p = SerialT m a -> m ()
forall (m :: * -> *) a. Monad m => SerialT m a -> m ()
drain (SerialT m a -> m ())
-> (SerialT m a -> SerialT m a) -> SerialT m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Bool) -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
takeWhile a -> Bool
p
{-# DEPRECATED runWhile "Please use \"drainWhile\" instead" #-}
{-# INLINE runWhile #-}
runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
runWhile :: (a -> Bool) -> SerialT m a -> m ()
runWhile = (a -> Bool) -> SerialT m a -> m ()
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> SerialT m a -> m ()
drainWhile
{-# INLINE null #-}
null :: Monad m => SerialT m a -> m Bool
null :: SerialT m a -> m Bool
null = Stream m a -> m Bool
forall (m :: * -> *) a. Monad m => Stream m a -> m Bool
S.null (Stream m a -> m Bool)
-> (SerialT m a -> Stream m a) -> SerialT m a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE head #-}
head :: Monad m => SerialT m a -> m (Maybe a)
head :: SerialT m a -> m (Maybe a)
head = Stream m a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => Stream m a -> m (Maybe a)
S.head (Stream m a -> m (Maybe a))
-> (SerialT m a -> Stream m a) -> SerialT m a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE headElse #-}
headElse :: Monad m => a -> SerialT m a -> m a
headElse :: a -> SerialT m a -> m a
headElse a
x = a -> Stream m a -> m a
forall (m :: * -> *) a. Monad m => a -> Stream m a -> m a
D.headElse a
x (Stream m a -> m a)
-> (SerialT m a -> Stream m a) -> SerialT m a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE tail #-}
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
tail :: SerialT m a -> m (Maybe (t m a))
tail SerialT m a
m = t m a -> m (Maybe (t m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> m (Maybe (t m a))
K.tail (SerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt SerialT m a
m)
{-# INLINE init #-}
init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
init :: SerialT m a -> m (Maybe (t m a))
init SerialT m a
m = t m a -> m (Maybe (t m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> m (Maybe (t m a))
K.init (SerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt SerialT m a
m)
{-# INLINE last #-}
last :: Monad m => SerialT m a -> m (Maybe a)
last :: SerialT m a -> m (Maybe a)
last SerialT m a
m = Stream m a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => Stream m a -> m (Maybe a)
S.last (Stream m a -> m (Maybe a)) -> Stream m a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m
{-# INLINE elem #-}
elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
elem :: a -> SerialT m a -> m Bool
elem a
e SerialT m a
m = a -> Stream m a -> m Bool
forall (m :: * -> *) a.
(Monad m, Eq a) =>
a -> Stream m a -> m Bool
S.elem a
e (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE notElem #-}
notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
notElem :: a -> SerialT m a -> m Bool
notElem a
e SerialT m a
m = a -> Stream m a -> m Bool
forall (m :: * -> *) a.
(Monad m, Eq a) =>
a -> Stream m a -> m Bool
S.notElem a
e (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE length #-}
length :: Monad m => SerialT m a -> m Int
length :: SerialT m a -> m Int
length = (Int -> a -> Int) -> Int -> SerialT m a -> m Int
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
foldl' (\Int
n a
_ -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
0
{-# INLINE all #-}
all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
all :: (a -> Bool) -> SerialT m a -> m Bool
all a -> Bool
p SerialT m a
m = (a -> Bool) -> Stream m a -> m Bool
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> m Bool
S.all a -> Bool
p (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE any #-}
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
any :: (a -> Bool) -> SerialT m a -> m Bool
any a -> Bool
p SerialT m a
m = (a -> Bool) -> Stream m a -> m Bool
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> m Bool
S.any a -> Bool
p (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE and #-}
and :: Monad m => SerialT m Bool -> m Bool
and :: SerialT m Bool -> m Bool
and = (Bool -> Bool) -> SerialT m Bool -> m Bool
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> SerialT m a -> m Bool
all (Bool -> Bool -> Bool
forall a. Eq a => a -> a -> Bool
==Bool
True)
{-# INLINE or #-}
or :: Monad m => SerialT m Bool -> m Bool
or :: SerialT m Bool -> m Bool
or = (Bool -> Bool) -> SerialT m Bool -> m Bool
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> SerialT m a -> m Bool
any (Bool -> Bool -> Bool
forall a. Eq a => a -> a -> Bool
==Bool
True)
{-# INLINE sum #-}
sum :: (Monad m, Num a) => SerialT m a -> m a
sum :: SerialT m a -> m a
sum = (a -> a -> a) -> a -> SerialT m a -> m a
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
foldl' a -> a -> a
forall a. Num a => a -> a -> a
(+) a
0
{-# INLINE product #-}
product :: (Monad m, Num a) => SerialT m a -> m a
product :: SerialT m a -> m a
product = (a -> a -> a) -> a -> SerialT m a -> m a
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
foldl' a -> a -> a
forall a. Num a => a -> a -> a
(*) a
1
{-# INLINE mconcat #-}
mconcat :: (Monad m, Monoid a) => SerialT m a -> m a
mconcat :: SerialT m a -> m a
mconcat = (a -> a -> a) -> a -> SerialT m a -> m a
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> SerialT m a -> m b
foldr a -> a -> a
forall a. Monoid a => a -> a -> a
mappend a
forall a. Monoid a => a
mempty
{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
minimum :: SerialT m a -> m (Maybe a)
minimum SerialT m a
m = Stream m a -> m (Maybe a)
forall (m :: * -> *) a.
(Monad m, Ord a) =>
Stream m a -> m (Maybe a)
S.minimum (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE minimumBy #-}
minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
minimumBy :: (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
minimumBy a -> a -> Ordering
cmp SerialT m a
m = (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> m (Maybe a)
S.minimumBy a -> a -> Ordering
cmp (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
maximum :: SerialT m a -> m (Maybe a)
maximum = SerialT m a -> m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Ord a) =>
t m a -> m (Maybe a)
P.maximum
{-# INLINE maximumBy #-}
maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
maximumBy :: (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
maximumBy a -> a -> Ordering
cmp SerialT m a
m = (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> m (Maybe a)
S.maximumBy a -> a -> Ordering
cmp (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE (!!) #-}
(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a)
SerialT m a
m !! :: SerialT m a -> Int -> m (Maybe a)
!! Int
i = SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m Stream m a -> Int -> m (Maybe a)
forall (m :: * -> *) a. Monad m => Stream m a -> Int -> m (Maybe a)
S.!! Int
i
{-# INLINE lookup #-}
lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b)
lookup :: a -> SerialT m (a, b) -> m (Maybe b)
lookup a
a SerialT m (a, b)
m = a -> Stream m (a, b) -> m (Maybe b)
forall (m :: * -> *) a b.
(Monad m, Eq a) =>
a -> Stream m (a, b) -> m (Maybe b)
S.lookup a
a (SerialT m (a, b) -> Stream m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m (a, b)
m)
{-# INLINE find #-}
find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a)
find :: (a -> Bool) -> SerialT m a -> m (Maybe a)
find a -> Bool
p SerialT m a
m = (a -> Bool) -> Stream m a -> m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> m (Maybe a)
S.find a -> Bool
p (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE findM #-}
findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a)
findM :: (a -> m Bool) -> SerialT m a -> m (Maybe a)
findM a -> m Bool
p SerialT m a
m = (a -> m Bool) -> Stream m a -> m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> m (Maybe a)
S.findM a -> m Bool
p (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE findIndices #-}
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
findIndices :: (a -> Bool) -> t m a -> t m Int
findIndices a -> Bool
p t m a
m = Stream m Int -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m Int -> t m Int) -> Stream m Int -> t m Int
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m Int
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m Int
S.findIndices a -> Bool
p (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE findIndex #-}
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
findIndex :: (a -> Bool) -> SerialT m a -> m (Maybe Int)
findIndex a -> Bool
p = SerialT m Int -> m (Maybe Int)
forall (m :: * -> *) a. Monad m => SerialT m a -> m (Maybe a)
head (SerialT m Int -> m (Maybe Int))
-> (SerialT m a -> SerialT m Int) -> SerialT m a -> m (Maybe Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Bool) -> SerialT m a -> SerialT m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices a -> Bool
p
{-# INLINE elemIndices #-}
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
elemIndices :: a -> t m a -> t m Int
elemIndices a
a = (a -> Bool) -> t m a -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices (a -> a -> Bool
forall a. Eq a => a -> a -> Bool
==a
a)
{-# INLINE elemIndex #-}
elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
elemIndex :: a -> SerialT m a -> m (Maybe Int)
elemIndex a
a = (a -> Bool) -> SerialT m a -> m (Maybe Int)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> SerialT m a -> m (Maybe Int)
findIndex (a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
a)
{-# INLINE isPrefixOf #-}
isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
isPrefixOf :: t m a -> t m a -> m Bool
isPrefixOf t m a
m1 t m a
m2 = Stream m a -> Stream m a -> m Bool
forall a (m :: * -> *).
(Eq a, Monad m) =>
Stream m a -> Stream m a -> m Bool
D.isPrefixOf (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m2)
{-# INLINE isSuffixOf #-}
isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool
isSuffixOf :: SerialT m a -> SerialT m a -> m Bool
isSuffixOf SerialT m a
suffix SerialT m a
stream = SerialT m a -> SerialT m a -> m Bool
forall a (t :: (* -> *) -> * -> *) (m :: * -> *).
(Eq a, IsStream t, Monad m) =>
t m a -> t m a -> m Bool
isPrefixOf (SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m a
reverse SerialT m a
suffix) (SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m a
reverse SerialT m a
stream)
{-# INLINE isInfixOf #-}
isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a)
=> SerialT m a -> SerialT m a -> m Bool
isInfixOf :: SerialT m a -> SerialT m a -> m Bool
isInfixOf SerialT m a
infx SerialT m a
stream = do
Array a
arr <- Fold m a (Array a) -> SerialT m a -> m (Array a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Fold m a (Array a)
A.write SerialT m a
infx
Bool
r <- SerialT m () -> m Bool
forall (m :: * -> *) a. Monad m => SerialT m a -> m Bool
null (SerialT m () -> m Bool) -> SerialT m () -> m Bool
forall a b. (a -> b) -> a -> b
$ Int -> SerialT m () -> SerialT m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m a
drop Int
1 (SerialT m () -> SerialT m ()) -> SerialT m () -> SerialT m ()
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a () -> SerialT m a -> SerialT m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
arr Fold m a ()
forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain SerialT m a
stream
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Bool
not Bool
r)
{-# INLINE isSubsequenceOf #-}
isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
isSubsequenceOf :: t m a -> t m a -> m Bool
isSubsequenceOf t m a
m1 t m a
m2 = Stream m a -> Stream m a -> m Bool
forall a (m :: * -> *).
(Eq a, Monad m) =>
Stream m a -> Stream m a -> m Bool
D.isSubsequenceOf (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m2)
{-# INLINE stripPrefix #-}
stripPrefix
:: (Eq a, IsStream t, Monad m)
=> t m a -> t m a -> m (Maybe (t m a))
stripPrefix :: t m a -> t m a -> m (Maybe (t m a))
stripPrefix t m a
m1 t m a
m2 = (Stream m a -> t m a) -> Maybe (Stream m a) -> Maybe (t m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Maybe (Stream m a) -> Maybe (t m a))
-> m (Maybe (Stream m a)) -> m (Maybe (t m a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
Stream m a -> Stream m a -> m (Maybe (Stream m a))
forall a (m :: * -> *).
(Eq a, Monad m) =>
Stream m a -> Stream m a -> m (Maybe (Stream m a))
D.stripPrefix (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m2)
{-# INLINE stripSuffix #-}
stripSuffix
:: (Monad m, Eq a)
=> SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
stripSuffix :: SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
stripSuffix SerialT m a
m1 SerialT m a
m2 = (SerialT m a -> SerialT m a)
-> Maybe (SerialT m a) -> Maybe (SerialT m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m a
reverse (Maybe (SerialT m a) -> Maybe (SerialT m a))
-> m (Maybe (SerialT m a)) -> m (Maybe (SerialT m a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
forall a (t :: (* -> *) -> * -> *) (m :: * -> *).
(Eq a, IsStream t, Monad m) =>
t m a -> t m a -> m (Maybe (t m a))
stripPrefix (SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m a
reverse SerialT m a
m1) (SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m a
reverse SerialT m a
m2)
{-# INLINE dropPrefix #-}
dropPrefix ::
t m a -> t m a -> t m a
dropPrefix :: t m a -> t m a -> t m a
dropPrefix = String -> t m a -> t m a -> t m a
forall a. HasCallStack => String -> a
error String
"Not implemented yet!"
{-# INLINE dropInfix #-}
dropInfix ::
t m a -> t m a -> t m a
dropInfix :: t m a -> t m a -> t m a
dropInfix = String -> t m a -> t m a -> t m a
forall a. HasCallStack => String -> a
error String
"Not implemented yet!"
{-# INLINE dropSuffix #-}
dropSuffix ::
t m a -> t m a -> t m a
dropSuffix :: t m a -> t m a -> t m a
dropSuffix = String -> t m a -> t m a -> t m a
forall a. HasCallStack => String -> a
error String
"Not implemented yet!"
{-# INLINE mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
mapM_ :: (a -> m b) -> SerialT m a -> m ()
mapM_ a -> m b
f SerialT m a
m = (a -> m b) -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> m ()
S.mapM_ a -> m b
f (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m
{-# INLINE toList #-}
toList :: Monad m => SerialT m a -> m [a]
toList :: SerialT m a -> m [a]
toList = SerialT m a -> m [a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
P.toList
{-# INLINE toListRev #-}
toListRev :: Monad m => SerialT m a -> m [a]
toListRev :: SerialT m a -> m [a]
toListRev = Stream m a -> m [a]
forall (m :: * -> *) a. Monad m => Stream m a -> m [a]
D.toListRev (Stream m a -> m [a])
-> (SerialT m a -> Stream m a) -> SerialT m a -> m [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# DEPRECATED toHandle
"Please use Streamly.FileSystem.Handle module (see the changelog)" #-}
toHandle :: MonadIO m => IO.Handle -> SerialT m String -> m ()
toHandle :: Handle -> SerialT m String -> m ()
toHandle Handle
h SerialT m String
m = SerialT m String -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
t m String -> m ()
go SerialT m String
m
where
go :: t m String -> m ()
go t m String
m1 =
let stop :: m ()
stop = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
single :: String -> m ()
single String
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> String -> IO ()
IO.hPutStrLn Handle
h String
a)
yieldk :: String -> t m String -> m ()
yieldk String
a t m String
r = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> String -> IO ()
IO.hPutStrLn Handle
h String
a) m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t m String -> m ()
go t m String
r
in State Stream m String
-> (String -> t m String -> m ())
-> (String -> m ())
-> m ()
-> t m String
-> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState String -> t m String -> m ()
yieldk String -> m ()
forall (m :: * -> *). MonadIO m => String -> m ()
single m ()
stop t m String
m1
{-# INLINE toStream #-}
toStream :: Monad m => Fold m a (SerialT Identity a)
toStream :: Fold m a (SerialT Identity a)
toStream = ((SerialT Identity a -> SerialT Identity a)
-> a -> m (SerialT Identity a -> SerialT Identity a))
-> m (SerialT Identity a -> SerialT Identity a)
-> ((SerialT Identity a -> SerialT Identity a)
-> m (SerialT Identity a))
-> Fold m a (SerialT Identity a)
forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold (\SerialT Identity a -> SerialT Identity a
f a
x -> (SerialT Identity a -> SerialT Identity a)
-> m (SerialT Identity a -> SerialT Identity a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((SerialT Identity a -> SerialT Identity a)
-> m (SerialT Identity a -> SerialT Identity a))
-> (SerialT Identity a -> SerialT Identity a)
-> m (SerialT Identity a -> SerialT Identity a)
forall a b. (a -> b) -> a -> b
$ SerialT Identity a -> SerialT Identity a
f (SerialT Identity a -> SerialT Identity a)
-> (SerialT Identity a -> SerialT Identity a)
-> SerialT Identity a
-> SerialT Identity a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a
x a -> SerialT Identity a -> SerialT Identity a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`K.cons`))
((SerialT Identity a -> SerialT Identity a)
-> m (SerialT Identity a -> SerialT Identity a)
forall (m :: * -> *) a. Monad m => a -> m a
return SerialT Identity a -> SerialT Identity a
forall a. a -> a
id)
(SerialT Identity a -> m (SerialT Identity a)
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT Identity a -> m (SerialT Identity a))
-> ((SerialT Identity a -> SerialT Identity a)
-> SerialT Identity a)
-> (SerialT Identity a -> SerialT Identity a)
-> m (SerialT Identity a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((SerialT Identity a -> SerialT Identity a)
-> SerialT Identity a -> SerialT Identity a
forall a b. (a -> b) -> a -> b
$ SerialT Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil))
{-# INLINABLE toStreamRev #-}
toStreamRev :: Monad m => Fold m a (SerialT Identity a)
toStreamRev :: Fold m a (SerialT Identity a)
toStreamRev = (SerialT Identity a -> a -> m (SerialT Identity a))
-> m (SerialT Identity a)
-> (SerialT Identity a -> m (SerialT Identity a))
-> Fold m a (SerialT Identity a)
forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold (\SerialT Identity a
xs a
x -> SerialT Identity a -> m (SerialT Identity a)
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT Identity a -> m (SerialT Identity a))
-> SerialT Identity a -> m (SerialT Identity a)
forall a b. (a -> b) -> a -> b
$ a
x a -> SerialT Identity a -> SerialT Identity a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`K.cons` SerialT Identity a
xs) (SerialT Identity a -> m (SerialT Identity a)
forall (m :: * -> *) a. Monad m => a -> m a
return SerialT Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil) SerialT Identity a -> m (SerialT Identity a)
forall (m :: * -> *) a. Monad m => a -> m a
return
{-# INLINE toPure #-}
toPure :: Monad m => SerialT m a -> m (SerialT Identity a)
toPure :: SerialT m a -> m (SerialT Identity a)
toPure = (a -> SerialT Identity a -> SerialT Identity a)
-> SerialT Identity a -> SerialT m a -> m (SerialT Identity a)
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> SerialT m a -> m b
foldr a -> SerialT Identity a -> SerialT Identity a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
K.cons SerialT Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
{-# INLINE toPureRev #-}
toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a)
toPureRev :: SerialT m a -> m (SerialT Identity a)
toPureRev = (SerialT Identity a -> a -> SerialT Identity a)
-> SerialT Identity a -> SerialT m a -> m (SerialT Identity a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
foldl' ((a -> SerialT Identity a -> SerialT Identity a)
-> SerialT Identity a -> a -> SerialT Identity a
forall a b c. (a -> b -> c) -> b -> a -> c
flip a -> SerialT Identity a -> SerialT Identity a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
K.cons) SerialT Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
infixr 0 |$
infixr 0 |$.
infixl 1 |&
infixl 1 |&.
{-# INLINE (|$) #-}
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> (t m a -> t m b)
|$ :: (t m a -> t m b) -> t m a -> t m b
(|$) t m a -> t m b
f = t m a -> t m b
f (t m a -> t m b) -> (t m a -> t m a) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
D.mkParallel
{-# INLINE applyAsync #-}
applyAsync :: (IsStream t, MonadAsync m)
=> (t m a -> t m b) -> (t m a -> t m b)
applyAsync :: (t m a -> t m b) -> t m a -> t m b
applyAsync = (t m a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$)
{-# INLINE (|&) #-}
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
t m a
x |& :: t m a -> (t m a -> t m b) -> t m b
|& t m a -> t m b
f = t m a -> t m b
f (t m a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
|$ t m a
x
{-# INLINE (|$.) #-}
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> (t m a -> m b)
|$. :: (t m a -> m b) -> t m a -> m b
(|$.) t m a -> m b
f = t m a -> m b
f (t m a -> m b) -> (t m a -> t m a) -> t m a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
D.mkParallel
{-# INLINE foldAsync #-}
foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> (t m a -> m b)
foldAsync :: (t m a -> m b) -> t m a -> m b
foldAsync = (t m a -> m b) -> t m a -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> m b
(|$.)
{-# INLINE (|&.) #-}
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
t m a
x |&. :: t m a -> (t m a -> m b) -> m b
|&. t m a -> m b
f = t m a -> m b
f (t m a -> m b) -> t m a -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> m b
|$. t m a
x
{-# INLINE transform #-}
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
transform :: Pipe m a b -> t m a -> t m b
transform Pipe m a b
pipe t m a
xs = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Pipe m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Stream m a -> Stream m b
D.transform Pipe m a b
pipe (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# DEPRECATED scanx "Please use scanl followed by map instead." #-}
{-# INLINE scanx #-}
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx :: (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx = (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
P.scanlx'
{-# INLINE scanlM' #-}
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b
scanlM' :: (b -> a -> m b) -> b -> t m a -> t m b
scanlM' b -> a -> m b
step b
begin t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> Stream m a -> Stream m b
D.scanlM' b -> a -> m b
step b
begin (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl' #-}
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
scanl' :: (b -> a -> b) -> b -> t m a -> t m b
scanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
S.scanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE postscanl' #-}
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
postscanl' :: (b -> a -> b) -> b -> t m a -> t m b
postscanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.postscanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE postscanlM' #-}
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b
postscanlM' :: (b -> a -> m b) -> b -> t m a -> t m b
postscanlM' b -> a -> m b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> Stream m a -> Stream m b
D.postscanlM' b -> a -> m b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanl' #-}
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
prescanl' :: (b -> a -> b) -> b -> t m a -> t m b
prescanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.prescanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanlM' #-}
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' :: (b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' b -> a -> m b
step m b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.prescanlM' b -> a -> m b
step m b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1M' #-}
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
scanl1M' :: (a -> a -> m a) -> t m a -> t m a
scanl1M' a -> a -> m a
step t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
D.scanl1M' a -> a -> m a
step (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1' #-}
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
scanl1' :: (a -> a -> a) -> t m a -> t m a
scanl1' a -> a -> a
step t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
D.scanl1' a -> a -> a
step (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scan #-}
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
scan :: Fold m a b -> t m a -> t m b
scan (Fold s -> a -> m s
step m s
begin s -> m b
done) = (s -> a -> m s) -> m s -> (s -> m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> m x) -> m x -> (x -> m b) -> t m a -> t m b
P.scanlMx' s -> a -> m s
step m s
begin s -> m b
done
{-# INLINE postscan #-}
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
postscan :: Fold m a b -> t m a -> t m b
postscan (Fold s -> a -> m s
step m s
begin s -> m b
done) = (s -> a -> m s) -> m s -> (s -> m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> m x) -> m x -> (x -> m b) -> t m a -> t m b
P.postscanlMx' s -> a -> m s
step m s
begin s -> m b
done
{-# INLINE rollingMap #-}
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap :: (a -> a -> b) -> t m a -> t m b
rollingMap a -> a -> b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> b) -> Stream m a -> Stream m b
D.rollingMap a -> a -> b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE rollingMapM #-}
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
rollingMapM :: (a -> a -> m b) -> t m a -> t m b
rollingMapM a -> a -> m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> m b) -> Stream m a -> Stream m b
D.rollingMapM a -> a -> m b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE filter #-}
#if __GLASGOW_HASKELL__ != 802
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
filter :: (a -> Bool) -> t m a -> t m a
filter a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.filter a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
#else
filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter = K.filter
#endif
{-# INLINE filterM #-}
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
filterM :: (a -> m Bool) -> t m a -> t m a
filterM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.filterM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE uniq #-}
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
uniq :: t m a -> t m a
uniq = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall a (m :: * -> *). (Eq a, Monad m) => Stream m a -> Stream m a
D.uniq (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE the #-}
the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a)
the :: SerialT m a -> m (Maybe a)
the SerialT m a
m = Stream m a -> m (Maybe a)
forall a (m :: * -> *).
(Eq a, Monad m) =>
Stream m a -> m (Maybe a)
S.the (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
m)
{-# INLINE take #-}
take :: (IsStream t, Monad m) => Int -> t m a -> t m a
take :: Int -> t m a -> t m a
take Int
n t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
S.take Int
n (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
(Maybe Int64 -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Int64 -> t m a -> t m a
maxYields (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)) t m a
m)
{-# INLINE takeWhile #-}
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeWhile :: (a -> Bool) -> t m a -> t m a
takeWhile a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.takeWhile a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE takeWhileM #-}
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
takeWhileM :: (a -> m Bool) -> t m a -> t m a
takeWhileM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.takeWhileM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE takeByTime #-}
takeByTime ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
takeByTime :: d -> t m a -> t m a
takeByTime d
d = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. d -> Stream m a -> Stream m a
forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
D.takeByTime d
d (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE drop #-}
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
drop :: Int -> t m a -> t m a
drop Int
n t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
S.drop Int
n (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE dropWhile #-}
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
dropWhile :: (a -> Bool) -> t m a -> t m a
dropWhile a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.dropWhile a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE dropWhileM #-}
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
dropWhileM :: (a -> m Bool) -> t m a -> t m a
dropWhileM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.dropWhileM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE dropByTime #-}
dropByTime ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
dropByTime :: d -> t m a -> t m a
dropByTime d
d = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. d -> Stream m a -> Stream m a
forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
D.dropByTime d
d (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE_EARLY mapM #-}
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
mapM :: (a -> m b) -> t m a -> t m b
mapM = (a -> m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
K.mapM
{-# RULES "mapM serial" mapM = mapMSerial #-}
{-# INLINE mapMSerial #-}
mapMSerial :: Monad m => (a -> m b) -> SerialT m a -> SerialT m b
mapMSerial :: (a -> m b) -> SerialT m a -> SerialT m b
mapMSerial = (a -> m b) -> SerialT m a -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m b) -> t m a -> t m b
Serial.mapM
{-# INLINE sequence #-}
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence :: t m (m a) -> t m a
sequence t m (m a)
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m (m a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (m a) -> Stream m a
S.sequence (t m (m a) -> Stream m (m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m (m a)
m)
{-# INLINE mapMaybe #-}
mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
mapMaybe :: (a -> Maybe b) -> t m a -> t m b
mapMaybe a -> Maybe b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Maybe b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
S.mapMaybe a -> Maybe b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE_EARLY mapMaybeM #-}
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m))
=> (a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM :: (a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM a -> m (Maybe b)
f = (Maybe b -> b) -> t m (Maybe b) -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (t m (Maybe b) -> t m b)
-> (t m a -> t m (Maybe b)) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe b -> Bool) -> t m (Maybe b) -> t m (Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (t m (Maybe b) -> t m (Maybe b))
-> (t m a -> t m (Maybe b)) -> t m a -> t m (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m (Maybe b)) -> t m a -> t m (Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
K.mapM a -> m (Maybe b)
f
{-# RULES "mapMaybeM serial" mapMaybeM = mapMaybeMSerial #-}
{-# INLINE mapMaybeMSerial #-}
mapMaybeMSerial :: Monad m => (a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial :: (a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial a -> m (Maybe b)
f SerialT m a
m = Stream m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ (a -> m (Maybe b)) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Stream m a -> Stream m b
D.mapMaybeM a -> m (Maybe b)
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT m a
m
{-# INLINE reverse #-}
reverse :: (IsStream t, Monad m) => t m a -> t m a
reverse :: t m a -> t m a
reverse t m a
s = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
S.reverse (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
s
{-# INLINE reverse' #-}
reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
reverse' :: t m a -> t m a
reverse' t m a
s = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m a -> Stream m a
D.reverse' (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
s
{-# INLINE intersperseM #-}
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseM :: m a -> t m a -> t m a
intersperseM m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
S.intersperseM m a
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE intersperse #-}
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
intersperse :: a -> t m a -> t m a
intersperse a
a = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
S.intersperse a
a (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE intersperseSuffix #-}
intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseSuffix :: m a -> t m a -> t m a
intersperseSuffix m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseSuffix m a
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseSuffix_ #-}
intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseSuffix_ :: m b -> t m a -> t m a
intersperseSuffix_ m b
m = (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m b) -> t m a -> t m b
Serial.mapM (\a
x -> m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE delayPost #-}
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPost :: Double -> t m a -> t m a
delayPost Double
n = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseSuffix_ (m () -> t m a -> t m a) -> m () -> t m a -> t m a
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE intersperseSuffixBySpan #-}
intersperseSuffixBySpan :: (IsStream t, MonadAsync m)
=> Int -> m a -> t m a -> t m a
intersperseSuffixBySpan :: Int -> m a -> t m a -> t m a
intersperseSuffixBySpan Int
n m a
eff =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
Int -> m a -> Stream m a -> Stream m a
D.intersperseSuffixBySpan Int
n m a
eff (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE interjectSuffix #-}
interjectSuffix
:: (IsStream t, MonadAsync m)
=> Double -> m a -> t m a -> t m a
interjectSuffix :: Double -> m a -> t m a -> t m a
interjectSuffix Double
n m a
f t m a
xs = t m a
xs t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`Par.parallelFst` m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM m a
timed
where timed :: m a
timed = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)) m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
f
{-# INLINE insertBy #-}
insertBy ::
(IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy :: (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy a -> a -> Ordering
cmp a
x t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> a -> Stream m a -> Stream m a
S.insertBy a -> a -> Ordering
cmp a
x (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE deleteBy #-}
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy :: (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy a -> a -> Bool
cmp a
x t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> a -> Stream m a -> Stream m a
S.deleteBy a -> a -> Bool
cmp a
x (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE indexed #-}
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
indexed :: t m a -> t m (Int, a)
indexed = Stream m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Int, a) -> t m (Int, a))
-> (t m a -> Stream m (Int, a)) -> t m a -> t m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
D.indexed (Stream m a -> Stream m (Int, a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE indexedR #-}
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
indexedR :: Int -> t m a -> t m (Int, a)
indexedR Int
n = Stream m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Int, a) -> t m (Int, a))
-> (t m a -> Stream m (Int, a)) -> t m a -> t m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a.
Monad m =>
Int -> Stream m a -> Stream m (Int, a)
D.indexedR Int
n (Stream m a -> Stream m (Int, a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINABLE eqBy #-}
eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool
eqBy :: (a -> b -> Bool) -> t m a -> t m b -> m Bool
eqBy = (a -> b -> Bool) -> t m a -> t m b -> m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> m Bool
P.eqBy
{-# INLINABLE cmpBy #-}
cmpBy
:: (IsStream t, Monad m)
=> (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
cmpBy :: (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
cmpBy = (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Ordering) -> t m a -> t m b -> m Ordering
P.cmpBy
{-# INLINABLE mergeBy #-}
mergeBy ::
(IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeBy a -> a -> Ordering
f t m a
m1 t m a
m2 = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
S.mergeBy a -> a -> Ordering
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m2)
{-# INLINABLE mergeByM #-}
mergeByM
:: (IsStream t, Monad m)
=> (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeByM :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeByM a -> a -> m Ordering
f t m a
m1 t m a
m2 = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
S.mergeByM a -> a -> m Ordering
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m1) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m2)
{-# INLINE mergeAsyncBy #-}
mergeAsyncBy :: (IsStream t, MonadAsync m)
=> (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeAsyncBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeAsyncBy a -> a -> Ordering
f = (a -> a -> m Ordering) -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeAsyncByM (\a
a a
b -> Ordering -> m Ordering
forall (m :: * -> *) a. Monad m => a -> m a
return (Ordering -> m Ordering) -> Ordering -> m Ordering
forall a b. (a -> b) -> a -> b
$ a -> a -> Ordering
f a
a a
b)
{-# INLINE mergeAsyncByM #-}
mergeAsyncByM :: (IsStream t, MonadAsync m)
=> (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeAsyncByM :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeAsyncByM a -> a -> m Ordering
f t m a
m1 t m a
m2 = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
D.mergeByM a -> a -> m Ordering
f (Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
D.mkParallelD (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m1) (Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
D.mkParallelD (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m2)
{-# INLINE concatMapWith #-}
concatMapWith
:: IsStream t
=> (forall c. t m c -> t m c -> t m c)
-> (a -> t m b)
-> t m a
-> t m b
concatMapWith :: (forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
concatMapWith = (forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy
{-# INLINE concatMap #-}
concatMap ::(IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
concatMap :: (a -> t m b) -> t m a -> t m b
concatMap a -> t m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
D.concatMap (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (t m b -> Stream m b) -> (a -> t m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
f) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concat #-}
concat :: (IsStream t, Monad m) => t m (t m a) -> t m a
concat :: t m (t m a) -> t m a
concat = (t m a -> t m a) -> t m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap t m a -> t m a
forall a. a -> a
id
{-# INLINE append #-}
append ::(IsStream t, Monad m) => t m b -> t m b -> t m b
append :: t m b -> t m b -> t m b
append t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.append (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)
{-# INLINE interleave #-}
interleave ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleave :: t m b -> t m b -> t m b
interleave t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleave (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)
{-# INLINE interleaveSuffix #-}
interleaveSuffix ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveSuffix :: t m b -> t m b -> t m b
interleaveSuffix t m b
m1 t m b
m2 =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleaveSuffix (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)
{-# INLINE interleaveInfix #-}
interleaveInfix ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveInfix :: t m b -> t m b -> t m b
interleaveInfix t m b
m1 t m b
m2 =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleaveInfix (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)
{-# INLINE interleaveMin #-}
interleaveMin ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveMin :: t m b -> t m b -> t m b
interleaveMin t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.interleaveMin (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)
{-# INLINE roundrobin #-}
roundrobin ::(IsStream t, Monad m) => t m b -> t m b -> t m b
roundrobin :: t m b -> t m b -> t m b
roundrobin t m b
m1 t m b
m2 = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
D.roundRobin (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)
{-# INLINE concatMapM #-}
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
concatMapM :: (a -> m (t m b)) -> t m a -> t m b
concatMapM a -> m (t m b)
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> m (Stream m b)) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
D.concatMapM ((t m b -> Stream m b) -> m (t m b) -> m (Stream m b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (m (t m b) -> m (Stream m b))
-> (a -> m (t m b)) -> a -> m (Stream m b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m (t m b)
f) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatM #-}
concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
concatM :: m (t m a) -> t m a
concatM m (t m a)
generator = (() -> m (t m a)) -> t m () -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
concatMapM (\() -> m (t m a)
generator) (() -> t m ()
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield ())
{-# INLINE concatUnfold #-}
concatUnfold ::(IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
concatUnfold :: Unfold m a b -> t m a -> t m b
concatUnfold Unfold m a b
u t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatMapU Unfold m a b
u (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatUnfoldInterleave #-}
concatUnfoldInterleave ::(IsStream t, Monad m)
=> Unfold m a b -> t m a -> t m b
concatUnfoldInterleave :: Unfold m a b -> t m a -> t m b
concatUnfoldInterleave Unfold m a b
u t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatUnfoldInterleave Unfold m a b
u (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatUnfoldRoundrobin #-}
concatUnfoldRoundrobin ::(IsStream t, Monad m)
=> Unfold m a b -> t m a -> t m b
concatUnfoldRoundrobin :: Unfold m a b -> t m a -> t m b
concatUnfoldRoundrobin Unfold m a b
u t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatUnfoldRoundrobin Unfold m a b
u (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE gintercalate #-}
gintercalate
:: (IsStream t, Monad m)
=> Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalate :: Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalate Unfold m a c
unf1 t m a
str1 Unfold m b c
unf2 t m b
str2 =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a c b.
Monad m =>
Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
D.gintercalate
Unfold m a c
unf1 (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str1)
Unfold m b c
unf2 (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str2)
{-# INLINE intercalate #-}
intercalate :: (IsStream t, Monad m)
=> b -> Unfold m b c -> t m b -> t m c
intercalate :: b -> Unfold m b c -> t m b -> t m c
intercalate b
seed Unfold m b c
unf t m b
str = Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$
Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatMapU Unfold m b c
unf (Stream m b -> Stream m c) -> Stream m b -> Stream m c
forall a b. (a -> b) -> a -> b
$ b -> Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
D.intersperse b
seed (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
str)
{-# INLINE interpose #-}
interpose :: (IsStream t, Monad m)
=> c -> Unfold m b c -> t m b -> t m c
interpose :: c -> Unfold m b c -> t m b -> t m c
interpose c
x Unfold m b c
unf t m b
str =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ m c -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) c b.
Monad m =>
m c -> Unfold m b c -> Stream m b -> Stream m c
D.interpose (c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return c
x) Unfold m b c
unf (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str)
{-# INLINE gintercalateSuffix #-}
gintercalateSuffix
:: (IsStream t, Monad m)
=> Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalateSuffix :: Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalateSuffix Unfold m a c
unf1 t m a
str1 Unfold m b c
unf2 t m b
str2 =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a c b.
Monad m =>
Unfold m a c
-> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
D.gintercalateSuffix
Unfold m a c
unf1 (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str1)
Unfold m b c
unf2 (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str2)
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (IsStream t, Monad m)
=> b -> Unfold m b c -> t m b -> t m c
intercalateSuffix :: b -> Unfold m b c -> t m b -> t m c
intercalateSuffix b
seed Unfold m b c
unf t m b
str = Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatMapU Unfold m b c
unf
(Stream m b -> Stream m c) -> Stream m b -> Stream m c
forall a b. (a -> b) -> a -> b
$ m b -> Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseSuffix (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
seed) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str)
{-# INLINE interposeSuffix #-}
interposeSuffix :: (IsStream t, Monad m)
=> c -> Unfold m b c -> t m b -> t m c
interposeSuffix :: c -> Unfold m b c -> t m b -> t m c
interposeSuffix c
x Unfold m b c
unf t m b
str =
Stream m c -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m c -> t m c) -> Stream m c -> t m c
forall a b. (a -> b) -> a -> b
$ m c -> Unfold m b c -> Stream m b -> Stream m c
forall (m :: * -> *) c b.
Monad m =>
m c -> Unfold m b c -> Stream m b -> Stream m c
D.interposeSuffix (c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return c
x) Unfold m b c
unf (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m b
str)
{-# INLINE concatMapIterateWith #-}
concatMapIterateWith
:: IsStream t
=> (forall c. t m c -> t m c -> t m c)
-> (a -> t m a)
-> t m a
-> t m a
concatMapIterateWith :: (forall c. t m c -> t m c -> t m c)
-> (a -> t m a) -> t m a -> t m a
concatMapIterateWith forall c. t m c -> t m c -> t m c
combine a -> t m a
f t m a
xs = (forall c. t m c -> t m c -> t m c)
-> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
concatMapWith forall c. t m c -> t m c -> t m c
combine a -> t m a
go t m a
xs
where
go :: a -> t m a
go a
x = a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield a
x t m a -> t m a -> t m a
forall c. t m c -> t m c -> t m c
`combine` (forall c. t m c -> t m c -> t m c)
-> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
concatMapWith forall c. t m c -> t m c -> t m c
combine a -> t m a
go (a -> t m a
f a
x)
{-# INLINE concatMapTreeWith #-}
concatMapTreeWith
:: IsStream t
=> (forall c. t m c -> t m c -> t m c)
-> (a -> t m (Either a b))
-> t m (Either a b)
-> t m (Either a b)
concatMapTreeWith :: (forall c. t m c -> t m c -> t m c)
-> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b)
concatMapTreeWith forall c. t m c -> t m c -> t m c
combine a -> t m (Either a b)
f t m (Either a b)
xs = (forall c. t m c -> t m c -> t m c)
-> (Either a b -> t m (Either a b))
-> t m (Either a b)
-> t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
concatMapWith forall c. t m c -> t m c -> t m c
combine Either a b -> t m (Either a b)
go t m (Either a b)
xs
where
go :: Either a b -> t m (Either a b)
go (Left a
tree) = Either a b -> t m (Either a b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield (a -> Either a b
forall a b. a -> Either a b
Left a
tree) t m (Either a b) -> t m (Either a b) -> t m (Either a b)
forall c. t m c -> t m c -> t m c
`combine` (forall c. t m c -> t m c -> t m c)
-> (Either a b -> t m (Either a b))
-> t m (Either a b)
-> t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
concatMapWith forall c. t m c -> t m c -> t m c
combine Either a b -> t m (Either a b)
go (a -> t m (Either a b)
f a
tree)
go (Right b
leaf) = Either a b -> t m (Either a b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield (Either a b -> t m (Either a b)) -> Either a b -> t m (Either a b)
forall a b. (a -> b) -> a -> b
$ b -> Either a b
forall a b. b -> Either a b
Right b
leaf
{-# INLINE concatMapLoopWith #-}
concatMapLoopWith
:: (IsStream t, MonadAsync m)
=> (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either b c))
-> (b -> t m a)
-> t m a
-> t m c
concatMapLoopWith :: (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either b c)) -> (b -> t m a) -> t m a -> t m c
concatMapLoopWith forall x. t m x -> t m x -> t m x
combine a -> t m (Either b c)
f b -> t m a
fb t m a
xs =
(forall x. t m x -> t m x -> t m x)
-> (Either b c -> t m c) -> t m (Either b c) -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
concatMapWith forall x. t m x -> t m x -> t m x
combine Either b c -> t m c
go (t m (Either b c) -> t m c) -> t m (Either b c) -> t m c
forall a b. (a -> b) -> a -> b
$ (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either b c)) -> t m a -> t m (Either b c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
concatMapWith forall x. t m x -> t m x -> t m x
combine a -> t m (Either b c)
f t m a
xs
where
go :: Either b c -> t m c
go (Left b
b) = (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either b c)) -> (b -> t m a) -> t m a -> t m c
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, MonadAsync m) =>
(forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either b c)) -> (b -> t m a) -> t m a -> t m c
concatMapLoopWith forall x. t m x -> t m x -> t m x
combine a -> t m (Either b c)
f b -> t m a
fb (t m a -> t m c) -> t m a -> t m c
forall a b. (a -> b) -> a -> b
$ b -> t m a
fb b
b
go (Right c
c) = c -> t m c
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield c
c
{-# INLINE concatMapTreeYieldLeavesWith #-}
concatMapTreeYieldLeavesWith
:: (IsStream t, MonadAsync m)
=> (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either a b))
-> t m a
-> t m b
concatMapTreeYieldLeavesWith :: (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either a b)) -> t m a -> t m b
concatMapTreeYieldLeavesWith forall x. t m x -> t m x -> t m x
combine a -> t m (Either a b)
f = (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either a b)) -> (a -> t m a) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, MonadAsync m) =>
(forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either b c)) -> (b -> t m a) -> t m a -> t m c
concatMapLoopWith forall x. t m x -> t m x -> t m x
combine a -> t m (Either a b)
f a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield
{-# INLINE splitParse #-}
splitParse
:: (IsStream t, MonadThrow m)
=> Parser m a b
-> t m a
-> t m b
splitParse :: Parser m a b -> t m a -> t m b
splitParse Parser m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Parser m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.splitParse Parser m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE chunksOf #-}
chunksOf
:: (IsStream t, Monad m)
=> Int -> Fold m a b -> t m a -> t m b
chunksOf :: Int -> Fold m a b -> t m a -> t m b
chunksOf Int
n Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Int -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
D.groupsOf Int
n Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE chunksOf2 #-}
chunksOf2
:: (IsStream t, Monad m)
=> Int -> m c -> Fold2 m c a b -> t m a -> t m b
chunksOf2 :: Int -> m c -> Fold2 m c a b -> t m a -> t m b
chunksOf2 Int
n m c
action Fold2 m c a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
forall (m :: * -> *) c a b.
Monad m =>
Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
D.groupsOf2 Int
n m c
action Fold2 m c a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
=> Int -> t m a -> t m (Array a)
arraysOf :: Int -> t m a -> t m (Array a)
arraysOf Int
n = Int -> Fold m a (Array a) -> t m a -> t m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Int -> Fold m a b -> t m a -> t m b
chunksOf Int
n (Int -> Fold m a (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
writeNUnsafe Int
n)
{-# INLINE intervalsOf #-}
intervalsOf
:: (IsStream t, MonadAsync m)
=> Double -> Fold m a b -> t m a -> t m b
intervalsOf :: Double -> Fold m a b -> t m a -> t m b
intervalsOf Double
n Fold m a b
f t m a
xs =
(Maybe a -> Bool) -> Fold m (Maybe a) b -> t m (Maybe a) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing (Fold m a b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
FL.lcatMaybes Fold m a b
f)
(Double -> m (Maybe a) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) ((a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Serial.map a -> Maybe a
forall a. a -> Maybe a
Just t m a
xs))
{-# INLINE groupsBy #-}
groupsBy
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsBy :: (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
cmp Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE groupsByRolling #-}
groupsByRolling
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsByRolling :: (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsByRolling a -> a -> Bool
cmp Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsRollingBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
groups :: Fold m a b -> t m a -> t m b
groups = (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
forall a. Eq a => a -> a -> Bool
(==)
{-# INLINE splitOn #-}
splitOn
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn a -> Bool
predicate Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.splitBy a -> Bool
predicate Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitOnSuffix #-}
splitOnSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix a -> Bool
predicate Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.splitSuffixBy a -> Bool
predicate Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE wordsBy #-}
wordsBy
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy :: (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy a -> Bool
predicate Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.wordsBy a -> Bool
predicate Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitWithSuffix #-}
splitWithSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix a -> Bool
predicate Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.splitSuffixBy' a -> Bool
predicate Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitOnSeq #-}
splitOnSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSeq :: Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOn Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitOnSuffixSeq #-}
splitOnSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq :: Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitSuffixOn Bool
False Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitBySeq #-}
splitBySeq
:: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitBySeq :: Array a -> Fold m a b -> t m a -> t m b
splitBySeq Array a
patt Fold m a b
f t m a
m =
m b -> t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM (Fold m a b -> SerialT m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
f (Array a -> SerialT m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, Storable a) =>
Array a -> t m a
A.toStream Array a
patt)) (t m b -> t m b) -> t m b -> t m b
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m
{-# INLINE splitWithSuffixSeq #-}
splitWithSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq :: Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitSuffixOn Bool
True Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitInnerBy #-}
splitInnerBy
:: (IsStream t, Monad m)
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBy :: (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (f a)
xs
{-# INLINE splitInnerBySuffix #-}
splitInnerBySuffix
:: (IsStream t, Monad m, Eq (f a), Monoid (f a))
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBySuffix :: (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (f a)
xs
{-# INLINE tap #-}
tap :: (IsStream t, Monad m) => FL.Fold m a b -> t m a -> t m a
tap :: Fold m a b -> t m a -> t m a
tap Fold m a b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m a
D.tap Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE tapOffsetEvery #-}
tapOffsetEvery :: (IsStream t, Monad m)
=> Int -> Int -> FL.Fold m a b -> t m a -> t m a
tapOffsetEvery :: Int -> Int -> Fold m a b -> t m a -> t m a
tapOffsetEvery Int
offset Int
n Fold m a b
f t m a
xs =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Int -> Int -> Fold m a b -> Stream m a -> Stream m a
D.tapOffsetEvery Int
offset Int
n Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE tapAsync #-}
tapAsync :: (IsStream t, MonadAsync m) => FL.Fold m a b -> t m a -> t m a
tapAsync :: Fold m a b -> t m a -> t m a
tapAsync Fold m a b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
D.tapAsync Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE pollCounts #-}
pollCounts ::
(IsStream t, MonadAsync m)
=> (a -> Bool)
-> (t m Int -> t m Int)
-> Fold m Int b
-> t m a
-> t m a
pollCounts :: (a -> Bool)
-> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a
pollCounts a -> Bool
predicate t m Int -> t m Int
transf Fold m Int b
f t m a
xs =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD
(Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
D.pollCounts a -> Bool
predicate (t m Int -> Stream m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD (t m Int -> Stream m Int)
-> (Stream m Int -> t m Int) -> Stream m Int -> Stream m Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m Int -> t m Int
transf (t m Int -> t m Int)
-> (Stream m Int -> t m Int) -> Stream m Int -> t m Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m Int -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD) Fold m Int b
f
(Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE tapRate #-}
tapRate ::
(IsStream t, MonadAsync m, MonadCatch m)
=> Double
-> (Int -> m b)
-> t m a
-> t m a
tapRate :: Double -> (Int -> m b) -> t m a -> t m a
tapRate Double
n Int -> m b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Double -> (Int -> m b) -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
Double -> (Int -> m b) -> Stream m a -> Stream m a
D.tapRate Double
n Int -> m b
f (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE trace #-}
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
trace :: (a -> m b) -> t m a -> t m a
trace a -> m b
f = (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
x) m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
#if __GLASGOW_HASKELL__ < 800
#define Type *
#endif
data SessionState t m k a b = SessionState
{ SessionState t m k a b -> AbsTime
sessionCurTime :: !AbsTime
, SessionState t m k a b -> AbsTime
sessionEventTime :: !AbsTime
, SessionState t m k a b -> Int
sessionCount :: !Int
, SessionState t m k a b -> Heap (Entry AbsTime k)
sessionTimerHeap :: H.Heap (H.Entry AbsTime k)
, SessionState t m k a b -> Map k a
sessionKeyValueMap :: Map.Map k a
, SessionState t m k a b -> t m (k, b)
sessionOutputStream :: t (m :: Type -> Type) (k, b)
}
#undef Type
{-# INLINABLE classifySessionsBy #-}
classifySessionsBy
:: (IsStream t, MonadAsync m, Ord k)
=> Double
-> Double
-> Bool
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifySessionsBy :: Double
-> Double
-> Bool
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifySessionsBy Double
tick Double
timeout Bool
reset Int -> m Bool
ejectPred
(Fold s -> a -> m s
step m s
initial s -> m (Either b b)
extract) t m (k, a, AbsTime)
str =
(SessionState t m k (Tuple' AbsTime s) b -> t m (k, b))
-> t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap (\SessionState t m k (Tuple' AbsTime s) b
session -> SessionState t m k (Tuple' AbsTime s) b -> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionOutputStream SessionState t m k (Tuple' AbsTime s) b
session)
(t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b))
-> t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b)
forall a b. (a -> b) -> a -> b
$ (SessionState t m k (Tuple' AbsTime s) b
-> Maybe (k, a, AbsTime)
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> t m (Maybe (k, a, AbsTime))
-> t m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> b -> t m a -> t m b
scanlM' SessionState t m k (Tuple' AbsTime s) b
-> Maybe (k, a, AbsTime)
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) k (t :: (* -> *) -> * -> *)
(m :: * -> *) b (m :: * -> *).
(IsStream t, Ord k) =>
SessionState t m k (Tuple' AbsTime s) b
-> Maybe (k, a, AbsTime)
-> m (SessionState t m k (Tuple' AbsTime s) b)
sstep SessionState t m k (Tuple' AbsTime s) b
forall (m :: * -> *) k a b. SessionState t m k a b
szero t m (Maybe (k, a, AbsTime))
stream
where
timeoutMs :: RelTime
timeoutMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
timeout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
tickMs :: RelTime
tickMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
szero :: SessionState t m k a b
szero = SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime k)
-> Map k a
-> t m (k, b)
-> SessionState t m k a b
SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionEventTime :: AbsTime
sessionEventTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionCount :: Int
sessionCount = Int
0
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
forall a. Heap a
H.empty
, sessionKeyValueMap :: Map k a
sessionKeyValueMap = Map k a
forall k a. Map k a
Map.empty
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
}
sstep :: SessionState t m k (Tuple' AbsTime s) b
-> Maybe (k, a, AbsTime)
-> m (SessionState t m k (Tuple' AbsTime s) b)
sstep (session :: SessionState t m k (Tuple' AbsTime s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..}) (Just (k
key, a
value, AbsTime
timestamp)) = do
let curTime :: AbsTime
curTime = AbsTime -> AbsTime -> AbsTime
forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
accumulate :: Maybe (Tuple' a s) -> m (Tuple' AbsTime s)
accumulate Maybe (Tuple' a s)
v = do
s
old <- case Maybe (Tuple' a s)
v of
Maybe (Tuple' a s)
Nothing -> m s
initial
Just (Tuple' a
_ s
acc) -> s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
acc
s
new <- s -> a -> m s
step s
old a
value
Tuple' AbsTime s -> m (Tuple' AbsTime s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Tuple' AbsTime s -> m (Tuple' AbsTime s))
-> Tuple' AbsTime s -> m (Tuple' AbsTime s)
forall a b. (a -> b) -> a -> b
$ AbsTime -> s -> Tuple' AbsTime s
forall a b. a -> b -> Tuple' a b
Tuple' AbsTime
timestamp s
new
mOld :: Maybe (Tuple' AbsTime s)
mOld = k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
sessionKeyValueMap
acc :: Tuple' AbsTime s
acc@(Tuple' AbsTime
_ s
fres) <- Maybe (Tuple' AbsTime s) -> m (Tuple' AbsTime s)
forall a. Maybe (Tuple' a s) -> m (Tuple' AbsTime s)
accumulate Maybe (Tuple' AbsTime s)
mOld
Either b b
res <- s -> m (Either b b)
extract s
fres
case Either b b
res of
Left b
x -> do
let (Map k (Tuple' AbsTime s)
mp, Int
cnt) = case Maybe (Tuple' AbsTime s)
mOld of
Maybe (Tuple' AbsTime s)
Nothing -> (Map k (Tuple' AbsTime s)
sessionKeyValueMap, Int
sessionCount)
Just Tuple' AbsTime s
_ -> (k -> Map k (Tuple' AbsTime s) -> Map k (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k (Tuple' AbsTime s)
sessionKeyValueMap
, Int
sessionCount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' AbsTime s) b
session
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
cnt
, sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp
, sessionOutputStream :: t m (k, b)
sessionOutputStream = (k, b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield (k
key, b
x)
}
Right b
_ -> do
(Heap (Entry AbsTime k)
hp1, Map k (Tuple' AbsTime s)
mp1, t m (k, b)
out1, Int
cnt1) <- do
let vars :: (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars = (Heap (Entry AbsTime k)
sessionTimerHeap, Map k (Tuple' AbsTime s)
sessionKeyValueMap,
t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil, Int
sessionCount)
case Maybe (Tuple' AbsTime s)
mOld of
Maybe (Tuple' AbsTime s)
Nothing -> do
Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
(Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt) <-
if Bool
eject
then (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall k d (t :: (* -> *) -> * -> *) (m :: * -> *).
(Ord k, Num d, IsStream t) =>
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars
else (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars
let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
hp' :: Heap (Entry AbsTime k)
hp' = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry k
key) Heap (Entry AbsTime k)
hp
in (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int))
-> (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall a b. (a -> b) -> a -> b
$ (Heap (Entry AbsTime k)
hp', Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, (Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))
Just Tuple' AbsTime s
_ -> (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars
let mp2 :: Map k (Tuple' AbsTime s)
mp2 = k
-> Tuple' AbsTime s
-> Map k (Tuple' AbsTime s)
-> Map k (Tuple' AbsTime s)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
key Tuple' AbsTime s
acc Map k (Tuple' AbsTime s)
mp1
SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime k)
-> Map k a
-> t m (k, b)
-> SessionState t m k a b
SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
cnt1
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp1
, sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp2
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out1
}
sstep (sessionState :: SessionState t m k (Tuple' AbsTime s) b
sessionState@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..}) Maybe (k, a, AbsTime)
Nothing =
let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
in SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
forall k (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *)
(m :: * -> *) b (m :: * -> *).
(Ord k, IsStream t) =>
SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
ejectExpired SessionState t m k (Tuple' AbsTime s) b
sessionState AbsTime
curTime
fromEither :: Either p p -> p
fromEither Either p p
e =
case Either p p
e of
Left p
x -> p
x
Right p
x -> p
x
ejectEntry :: a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry a
hp Map k a
mp t m (k, b)
out d
cnt s
acc k
key = do
Either b b
sess <- s -> m (Either b b)
extract s
acc
let out1 :: t m (k, b)
out1 = (k
key, Either b b -> b
forall p. Either p p -> p
fromEither Either b b
sess) (k, b) -> t m (k, b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`K.cons` t m (k, b)
out
let mp1 :: Map k a
mp1 = k -> Map k a -> Map k a
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k a
mp
(a, Map k a, t m (k, b), d) -> m (a, Map k a, t m (k, b), d)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
hp, Map k a
mp1, t m (k, b)
out1, (d
cnt d -> d -> d
forall a. Num a => a -> a -> a
- d
1))
ejectOne :: (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, !d
cnt) = do
let hres :: Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres = Heap (Entry AbsTime k)
-> Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime k)
hp
case Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres of
Just (Entry AbsTime
expiry k
key, Heap (Entry AbsTime k)
hp1) -> do
case k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
mp of
Maybe (Tuple' AbsTime s)
Nothing -> (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k)
hp1, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
Just (Tuple' AbsTime
latestTS s
acc) -> do
let expiry1 :: AbsTime
expiry1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
latestTS RelTime
timeoutMs
if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
expiry
then Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> d
-> s
-> k
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out d
cnt s
acc k
key
else
let hp2 :: Heap (Entry AbsTime k)
hp2 = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 k
key) Heap (Entry AbsTime k)
hp1
in (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k)
hp2, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' AbsTime s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' AbsTime s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
ejectExpired :: SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
ejectExpired (session :: SessionState t m k (Tuple' AbsTime s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..}) AbsTime
curTime = do
(Heap (Entry AbsTime k)
hp', Map k (Tuple' AbsTime s)
mp', t m (k, b)
out, Int
count) <-
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall k (t :: (* -> *) -> * -> *) (m :: * -> *).
(Ord k, IsStream t) =>
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
sessionTimerHeap Map k (Tuple' AbsTime s)
sessionKeyValueMap t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil Int
sessionCount
SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' AbsTime s) b
session
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
count
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp'
, sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp'
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out
}
where
ejectLoop :: Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp Map k (Tuple' AbsTime s)
mp t m (k, b)
out !Int
cnt = do
let hres :: Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres = Heap (Entry AbsTime k)
-> Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime k)
hp
case Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres of
Just (Entry AbsTime
expiry k
key, Heap (Entry AbsTime k)
hp1) -> do
(Bool
eject, Bool
force) <- do
if AbsTime
curTime AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
then (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
else do
Bool
r <- Int -> m Bool
ejectPred Int
cnt
(Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
if Bool
eject
then do
case k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
mp of
Maybe (Tuple' AbsTime s)
Nothing -> Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt
Just (Tuple' AbsTime
latestTS s
acc) -> do
let expiry1 :: AbsTime
expiry1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
latestTS RelTime
timeoutMs
if AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
then do
(Heap (Entry AbsTime k)
hp2,Map k (Tuple' AbsTime s)
mp1,t m (k, b)
out1,Int
cnt1) <-
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> s
-> k
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt s
acc k
key
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (Tuple' AbsTime s)
mp1 t m (k, b)
out1 Int
cnt1
else
let hp2 :: Heap (Entry AbsTime k)
hp2 = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 k
key) Heap (Entry AbsTime k)
hp1
in Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt
else (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt)
Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' AbsTime s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' AbsTime s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt)
stream :: t m (Maybe (k, a, AbsTime))
stream = ((k, a, AbsTime) -> Maybe (k, a, AbsTime))
-> t m (k, a, AbsTime) -> t m (Maybe (k, a, AbsTime))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Serial.map (k, a, AbsTime) -> Maybe (k, a, AbsTime)
forall a. a -> Maybe a
Just t m (k, a, AbsTime)
str t m (Maybe (k, a, AbsTime))
-> t m (Maybe (k, a, AbsTime)) -> t m (Maybe (k, a, AbsTime))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`Par.parallel` m (Maybe (k, a, AbsTime)) -> t m (Maybe (k, a, AbsTime))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM m (Maybe (k, a, AbsTime))
forall a. m (Maybe a)
timer
timer :: m (Maybe a)
timer = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)
Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
{-# INLINABLE classifyKeepAliveSessions #-}
classifyKeepAliveSessions
:: (IsStream t, MonadAsync m, Ord k)
=> Double
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifyKeepAliveSessions :: Double
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifyKeepAliveSessions Double
timeout Int -> m Bool
ejectPred =
Double
-> Double
-> Bool
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Double
-> Bool
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifySessionsBy Double
1 Double
timeout Bool
True Int -> m Bool
ejectPred
{-# INLINABLE classifySessionsOf #-}
classifySessionsOf
:: (IsStream t, MonadAsync m, Ord k)
=> Double
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifySessionsOf :: Double
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifySessionsOf Double
interval Int -> m Bool
ejectPred =
Double
-> Double
-> Bool
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Double
-> Bool
-> (Int -> m Bool)
-> Fold m a (Either b b)
-> t m (k, a, AbsTime)
-> t m (k, b)
classifySessionsBy Double
1 Double
interval Bool
False Int -> m Bool
ejectPred
{-# INLINE before #-}
before :: (IsStream t, Monad m) => m b -> t m a -> t m a
before :: m b -> t m a -> t m a
before m b
action t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.before m b
action (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE after #-}
after :: (IsStream t, Monad m) => m b -> t m a -> t m a
after :: m b -> t m a -> t m a
after m b
action t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.after m b
action (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE afterIO #-}
afterIO :: (IsStream t, MonadIO m, MonadBaseControl IO m) => m b -> t m a -> t m a
afterIO :: m b -> t m a -> t m a
afterIO m b
action t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
(MonadIO m, MonadBaseControl IO m) =>
m b -> Stream m a -> Stream m a
D.afterIO m b
action (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE onException #-}
onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
onException :: m b -> t m a -> t m a
onException m b
action t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
D.onException m b
action (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE finally #-}
finally :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
finally :: m b -> t m a -> t m a
finally m b
action t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
D.finally m b
action (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE finallyIO #-}
finallyIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a
finallyIO :: m b -> t m a -> t m a
finallyIO m b
action t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
m b -> Stream m a -> Stream m a
D.finallyIO m b
action (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE bracket #-}
bracket :: (IsStream t, MonadCatch m)
=> m b -> (b -> m c) -> (b -> t m a) -> t m a
bracket :: m b -> (b -> m c) -> (b -> t m a) -> t m a
bracket m b
bef b -> m c
aft b -> t m a
bet = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
D.bracket m b
bef b -> m c
aft (\b
x -> t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (t m a -> Stream m a) -> t m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ b -> t m a
bet b
x)
{-# INLINE bracketIO #-}
bracketIO :: (IsStream t, MonadAsync m, MonadCatch m)
=> m b -> (b -> m c) -> (b -> t m a) -> t m a
bracketIO :: m b -> (b -> m c) -> (b -> t m a) -> t m a
bracketIO m b
bef b -> m c
aft b -> t m a
bet = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
D.bracketIO m b
bef b -> m c
aft (\b
x -> t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD (t m a -> Stream m a) -> t m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ b -> t m a
bet b
x)
{-# INLINE handle #-}
handle :: (IsStream t, MonadCatch m, Exception e)
=> (e -> t m a) -> t m a -> t m a
handle :: (e -> t m a) -> t m a -> t m a
handle e -> t m a
handler t m a
xs =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (e -> Stream m a) -> Stream m a -> Stream m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> Stream m a) -> Stream m a -> Stream m a
D.handle (\e
e -> t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD (t m a -> Stream m a) -> t m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ e -> t m a
handler e
e) (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE hoist #-}
hoist :: (Monad m, Monad n)
=> (forall x. m x -> n x) -> SerialT m a -> SerialT n a
hoist :: (forall x. m x -> n x) -> SerialT m a -> SerialT n a
hoist forall x. m x -> n x
f SerialT m a
xs = Stream n a -> SerialT n a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream n a -> SerialT n a) -> Stream n a -> SerialT n a
forall a b. (a -> b) -> a -> b
$ (forall x. m x -> n x) -> Stream m a -> Stream n a
forall (n :: * -> *) (m :: * -> *) a.
Monad n =>
(forall x. m x -> n x) -> Stream m a -> Stream n a
S.hoist forall x. m x -> n x
f (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
xs)
{-# INLINE generally #-}
generally :: (IsStream t, Monad m) => t Identity a -> t m a
generally :: t Identity a -> t m a
generally t Identity a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall x. Identity x -> m x) -> Stream Identity a -> Stream m a
forall (n :: * -> *) (m :: * -> *) a.
Monad n =>
(forall x. m x -> n x) -> Stream m a -> Stream n a
S.hoist (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> m x) -> (Identity x -> x) -> Identity x -> m x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity x -> x
forall a. Identity a -> a
runIdentity) (t Identity a -> Stream Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t Identity a
xs)
{-# INLINE liftInner #-}
liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m))
=> t m a -> t (tr m) a
liftInner :: t m a -> t (tr m) a
liftInner t m a
xs = Stream (tr m) a -> t (tr m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream (tr m) a -> t (tr m) a) -> Stream (tr m) a -> t (tr m) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream (tr m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
D.liftInner (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE runReaderT #-}
runReaderT :: (IsStream t, Monad m) => s -> t (ReaderT s m) a -> t m a
runReaderT :: s -> t (ReaderT s m) a -> t m a
runReaderT s
s t (ReaderT s m) a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ s -> Stream (ReaderT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
s -> Stream (ReaderT s m) a -> Stream m a
D.runReaderT s
s (t (ReaderT s m) a -> Stream (ReaderT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t (ReaderT s m) a
xs)
{-# INLINE evalStateT #-}
evalStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m a
evalStateT :: s -> SerialT (StateT s m) a -> SerialT m a
evalStateT s
s SerialT (StateT s m) a
xs = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ s -> Stream (StateT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
s -> Stream (StateT s m) a -> Stream m a
D.evalStateT s
s (SerialT (StateT s m) a -> Stream (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT (StateT s m) a
xs)
{-# INLINE usingStateT #-}
usingStateT
:: Monad m
=> s
-> (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT m a
usingStateT :: s
-> (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT m a
usingStateT s
s SerialT (StateT s m) a -> SerialT (StateT s m) a
f SerialT m a
xs = s -> SerialT (StateT s m) a -> SerialT m a
forall (m :: * -> *) s a.
Monad m =>
s -> SerialT (StateT s m) a -> SerialT m a
evalStateT s
s (SerialT (StateT s m) a -> SerialT m a)
-> SerialT (StateT s m) a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ SerialT (StateT s m) a -> SerialT (StateT s m) a
f (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> SerialT (StateT s m) a -> SerialT (StateT s m) a
forall a b. (a -> b) -> a -> b
$ SerialT m a -> SerialT (StateT s m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
liftInner SerialT m a
xs
{-# INLINE runStateT #-}
runStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT :: s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT s
s SerialT (StateT s m) a
xs = Stream m (s, a) -> SerialT m (s, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (s, a) -> SerialT m (s, a))
-> Stream m (s, a) -> SerialT m (s, a)
forall a b. (a -> b) -> a -> b
$ s -> Stream (StateT s m) a -> Stream m (s, a)
forall (m :: * -> *) s a.
Monad m =>
s -> Stream (StateT s m) a -> Stream m (s, a)
D.runStateT s
s (SerialT (StateT s m) a -> Stream (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT (StateT s m) a
xs)
{-# INLINE usingReaderT #-}
usingReaderT
:: (Monad m, IsStream t)
=> r
-> (t (ReaderT r m) a -> t (ReaderT r m) a)
-> t m a
-> t m a
usingReaderT :: r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a
usingReaderT r
r t (ReaderT r m) a -> t (ReaderT r m) a
f t m a
xs = r -> t (ReaderT r m) a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) s a.
(IsStream t, Monad m) =>
s -> t (ReaderT s m) a -> t m a
runReaderT r
r (t (ReaderT r m) a -> t m a) -> t (ReaderT r m) a -> t m a
forall a b. (a -> b) -> a -> b
$ t (ReaderT r m) a -> t (ReaderT r m) a
f (t (ReaderT r m) a -> t (ReaderT r m) a)
-> t (ReaderT r m) a -> t (ReaderT r m) a
forall a b. (a -> b) -> a -> b
$ t m a -> t (ReaderT r m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
liftInner t m a
xs