{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TypeFamilies #-}
-- | These are stream fusion versions of some of the functions in
-- "Data.Conduit.Combinators".  Many functions don't have stream
-- versions here because instead they have @RULES@ which inline a
-- definition that fuses.
module Data.Conduit.Combinators.Stream
  ( yieldManyS
  , repeatMS
  , repeatWhileMS
  , foldl1S
  , allS
  , anyS
  , sinkLazyS
  , sinkVectorS
  , sinkVectorNS
  , sinkLazyBuilderS
  , lastS
  , lastES
  , findS
  , concatMapS
  , concatMapMS
  , concatS
  , scanlS
  , scanlMS
  , mapAccumWhileS
  , mapAccumWhileMS
  , intersperseS
  , slidingWindowS
  , filterMS
  , splitOnUnboundedES
  , initReplicateS
  , initRepeatS
  )
  where

-- BEGIN IMPORTS

import           Control.Monad (liftM)
import           Control.Monad.Primitive (PrimMonad)
import qualified Data.ByteString.Lazy as BL
import           Data.ByteString.Builder (Builder, toLazyByteString)
import           Data.Conduit.Internal.Fusion
import           Data.Conduit.Internal.List.Stream (foldS)
import           Data.Maybe (isNothing, isJust)
import           Data.MonoTraversable
#if ! MIN_VERSION_base(4,8,0)
import           Data.Monoid (Monoid (..))
#endif
import qualified Data.NonNull as NonNull
import qualified Data.Sequences as Seq
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import           Prelude

#if MIN_VERSION_mono_traversable(1,0,0)
import           Data.Sequences (LazySequence (..))
#else
import           Data.Sequences.Lazy
#endif

-- END IMPORTS

yieldManyS :: (Monad m, MonoFoldable mono)
            => mono
            -> StreamProducer m (Element mono)
yieldManyS :: forall (m :: * -> *) mono.
(Monad m, MonoFoldable mono) =>
mono -> StreamProducer m (Element mono)
yieldManyS mono
mono Stream m i ()
_ =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {o}. [o] -> Step [o] o ()
step) (forall (m :: * -> *) a. Monad m => a -> m a
return (forall mono. MonoFoldable mono => mono -> [Element mono]
otoList mono
mono))
  where
    step :: [o] -> Step [o] o ()
step [] = forall s o r. r -> Step s o r
Stop ()
    step (o
x:[o]
xs) = forall s o r. s -> o -> Step s o r
Emit [o]
xs o
x
{-# INLINE yieldManyS #-}

repeatMS :: Monad m
         => m a
         -> StreamProducer m a
repeatMS :: forall (m :: * -> *) a. Monad m => m a -> StreamProducer m a
repeatMS m a
m Stream m i ()
_ =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream () -> m (Step () a ())
step (forall (m :: * -> *) a. Monad m => a -> m a
return ())
  where
    step :: () -> m (Step () a ())
step ()
_ = forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall s o r. s -> o -> Step s o r
Emit ()) m a
m
{-# INLINE repeatMS #-}

repeatWhileMS :: Monad m
              => m a
              -> (a -> Bool)
              -> StreamProducer m a
repeatWhileMS :: forall (m :: * -> *) a.
Monad m =>
m a -> (a -> Bool) -> StreamProducer m a
repeatWhileMS m a
m a -> Bool
f Stream m i ()
_ =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream () -> m (Step () a ())
step (forall (m :: * -> *) a. Monad m => a -> m a
return ())
  where
    step :: () -> m (Step () a ())
step ()
_ = do
        a
x <- m a
m
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if a -> Bool
f a
x
            then forall s o r. s -> o -> Step s o r
Emit () a
x
            else forall s o r. r -> Step s o r
Stop ()
{-# INLINE repeatWhileMS #-}

foldl1S :: Monad m
        => (a -> a -> a)
        -> StreamConsumer a m (Maybe a)
foldl1S :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> StreamConsumer a m (Maybe a)
foldl1S a -> a -> a
f (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall a. Maybe a
Nothing, ) m s
ms0)
  where
    step' :: (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' (Maybe a
mprev, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop Maybe a
mprev
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (Maybe a
mprev, s
s')
            Emit s
s' a
a -> forall s o r. s -> Step s o r
Skip (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall b a. b -> (a -> b) -> Maybe a -> b
maybe a
a (a -> a -> a
`f` a
a) Maybe a
mprev, s
s')
{-# INLINE foldl1S #-}

allS :: Monad m
     => (a -> Bool)
     -> StreamConsumer a m Bool
allS :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamConsumer a m Bool
allS a -> Bool
f = forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS forall a. Maybe a -> Bool
isNothing (forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamConsumer a m (Maybe a)
findS (Bool -> Bool
Prelude.not forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f))
{-# INLINE allS #-}

anyS :: Monad m
     => (a -> Bool)
     -> StreamConsumer a m Bool
anyS :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamConsumer a m Bool
anyS a -> Bool
f = forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS forall a. Maybe a -> Bool
isJust (forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamConsumer a m (Maybe a)
findS a -> Bool
f)
{-# INLINE anyS #-}

--TODO: use a definition like
-- fmapS (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id

sinkLazyS :: (Monad m, LazySequence lazy strict)
          => StreamConsumer strict m lazy
sinkLazyS :: forall (m :: * -> *) lazy strict.
(Monad m, LazySequence lazy strict) =>
StreamConsumer strict m lazy
sinkLazyS = forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS (forall lazy strict. LazySequence lazy strict => [strict] -> lazy
fromChunks forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a b. (a -> b) -> a -> b
$ [])) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> StreamConsumer a m b
foldS (\[strict] -> [strict]
front strict
next -> [strict] -> [strict]
front forall b c a. (b -> c) -> (a -> b) -> a -> c
. (strict
nextforall a. a -> [a] -> [a]
:)) forall a. a -> a
id
{-# INLINE sinkLazyS #-}

sinkVectorS :: (V.Vector v a, PrimMonad m)
            => StreamConsumer a m (v a)
sinkVectorS :: forall (v :: * -> *) a (m :: * -> *).
(Vector v a, PrimMonad m) =>
StreamConsumer a m (v a)
sinkVectorS (Stream s -> m (Step s a ())
step m s
ms0) = do
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Int, Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
step' forall a b. (a -> b) -> a -> b
$ do
        s
s0 <- m s
ms0
        Mutable v (PrimState m) a
mv0 <- forall (m :: * -> *) (v :: * -> * -> *) a.
(HasCallStack, PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
initSize
        forall (m :: * -> *) a. Monad m => a -> m a
return (Int
initSize, Int
0, Mutable v (PrimState m) a
mv0, s
s0)
  where
    initSize :: Int
initSize = Int
10
    step' :: (Int, Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
step' (Int
maxSize, Int
i, Mutable v (PrimState m) a
mv, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall s o r. r -> Step s o r
Stop forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (v :: * -> *) a.
(HasCallStack, Vector v a) =>
Int -> Int -> v a -> v a
V.slice Int
0 Int
i) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
mv
            Skip s
s' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (Int
maxSize, Int
i, Mutable v (PrimState m) a
mv, s
s')
            Emit s
s' a
x -> do
                forall (m :: * -> *) (v :: * -> * -> *) a.
(HasCallStack, PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> a -> m ()
VM.write Mutable v (PrimState m) a
mv Int
i a
x
                let i' :: Int
i' = Int
i forall a. Num a => a -> a -> a
+ Int
1
                if Int
i' forall a. Ord a => a -> a -> Bool
>= Int
maxSize
                    then do
                        let newMax :: Int
newMax = Int
maxSize forall a. Num a => a -> a -> a
* Int
2
                        Mutable v (PrimState m) a
mv' <- forall (m :: * -> *) (v :: * -> * -> *) a.
(HasCallStack, PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> m (v (PrimState m) a)
VM.grow Mutable v (PrimState m) a
mv Int
maxSize
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (Int
newMax, Int
i', Mutable v (PrimState m) a
mv', s
s')
                    else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (Int
maxSize, Int
i', Mutable v (PrimState m) a
mv, s
s')
{-# INLINE sinkVectorS #-}

sinkVectorNS :: (V.Vector v a, PrimMonad m)
             => Int -- ^ maximum allowed size
             -> StreamConsumer a m (v a)
sinkVectorNS :: forall (v :: * -> *) a (m :: * -> *).
(Vector v a, PrimMonad m) =>
Int -> StreamConsumer a m (v a)
sinkVectorNS Int
maxSize (Stream s -> m (Step s a ())
step m s
ms0) = do
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
step' forall a b. (a -> b) -> a -> b
$ do
        s
s0 <- m s
ms0
        Mutable v (PrimState m) a
mv0 <- forall (m :: * -> *) (v :: * -> * -> *) a.
(HasCallStack, PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
maxSize
        forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Mutable v (PrimState m) a
mv0, s
s0)
  where
    step' :: (Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
step' (Int
i, Mutable v (PrimState m) a
mv, s
_) | Int
i forall a. Ord a => a -> a -> Bool
>= Int
maxSize = forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM forall s o r. r -> Step s o r
Stop forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
mv
    step' (Int
i, Mutable v (PrimState m) a
mv, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall s o r. r -> Step s o r
Stop forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (v :: * -> *) a.
(HasCallStack, Vector v a) =>
Int -> Int -> v a -> v a
V.slice Int
0 Int
i) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
mv
            Skip s
s' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (Int
i, Mutable v (PrimState m) a
mv, s
s')
            Emit s
s' a
x -> do
                forall (m :: * -> *) (v :: * -> * -> *) a.
(HasCallStack, PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> a -> m ()
VM.write Mutable v (PrimState m) a
mv Int
i a
x
                let i' :: Int
i' = Int
i forall a. Num a => a -> a -> a
+ Int
1
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (Int
i', Mutable v (PrimState m) a
mv, s
s')
{-# INLINE sinkVectorNS #-}

sinkLazyBuilderS :: Monad m => StreamConsumer Builder m BL.ByteString
sinkLazyBuilderS :: forall (m :: * -> *).
Monad m =>
StreamConsumer Builder m ByteString
sinkLazyBuilderS = forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS Builder -> ByteString
toLazyByteString (forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> StreamConsumer a m b
foldS forall a. Monoid a => a -> a -> a
mappend forall a. Monoid a => a
mempty)
{-# INLINE sinkLazyBuilderS #-}

lastS :: Monad m
      => StreamConsumer a m (Maybe a)
lastS :: forall (m :: * -> *) a. Monad m => StreamConsumer a m (Maybe a)
lastS (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall a. Maybe a
Nothing,) m s
ms0)
  where
    step' :: (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' (Maybe a
mlast, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop Maybe a
mlast
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (Maybe a
mlast, s
s')
            Emit s
s' a
x -> forall s o r. s -> Step s o r
Skip (forall a. a -> Maybe a
Just a
x, s
s')
{-# INLINE lastS #-}

lastES :: (Monad m, Seq.IsSequence seq)
       => StreamConsumer seq m (Maybe (Element seq))
lastES :: forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
StreamConsumer seq m (Maybe (Element seq))
lastES (Stream s -> m (Step s seq ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Maybe (NonNull seq), s)
-> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq)))
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall a. Maybe a
Nothing, ) m s
ms0)
  where
    step' :: (Maybe (NonNull seq), s)
-> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq)))
step' (Maybe (NonNull seq)
mlast, s
s) = do
        Step s seq ()
res <- s -> m (Step s seq ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s seq ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall mono. MonoFoldable mono => NonNull mono -> Element mono
NonNull.last Maybe (NonNull seq)
mlast)
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (Maybe (NonNull seq)
mlast, s
s')
            Emit s
s' (forall mono. MonoFoldable mono => mono -> Maybe (NonNull mono)
NonNull.fromNullable -> mlast' :: Maybe (NonNull seq)
mlast'@(Just NonNull seq
_)) -> forall s o r. s -> Step s o r
Skip (Maybe (NonNull seq)
mlast', s
s')
            Emit s
s' seq
_ -> forall s o r. s -> Step s o r
Skip (Maybe (NonNull seq)
mlast, s
s')
{-# INLINE lastES #-}

findS :: Monad m
      => (a -> Bool) -> StreamConsumer a m (Maybe a)
findS :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamConsumer a m (Maybe a)
findS a -> Bool
f (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream s -> m (Step s o (Maybe a))
step' m s
ms0
  where
    step' :: s -> m (Step s o (Maybe a))
step' s
s = do
      Step s a ()
res <- s -> m (Step s a ())
step s
s
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
          Stop () -> forall s o r. r -> Step s o r
Stop forall a. Maybe a
Nothing
          Skip s
s' -> forall s o r. s -> Step s o r
Skip s
s'
          Emit s
s' a
x ->
              if a -> Bool
f a
x
                  then forall s o r. r -> Step s o r
Stop (forall a. a -> Maybe a
Just a
x)
                  else forall s o r. s -> Step s o r
Skip s
s'
{-# INLINE findS #-}

concatMapS :: (Monad m, MonoFoldable mono)
           => (a -> mono)
           -> StreamConduit a m (Element mono)
concatMapS :: forall (m :: * -> *) mono a.
(Monad m, MonoFoldable mono) =>
(a -> mono) -> StreamConduit a m (Element mono)
concatMapS a -> mono
f (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ([], ) m s
ms0)
  where
    step' :: ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' ([], s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> forall s o r. s -> Step s o r
Skip ([], s
s')
            Emit s
s' a
x -> forall s o r. s -> Step s o r
Skip (forall mono. MonoFoldable mono => mono -> [Element mono]
otoList (a -> mono
f a
x), s
s')
    step' ((Element mono
x:[Element mono]
xs), s
s) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall s o r. s -> o -> Step s o r
Emit ([Element mono]
xs, s
s) Element mono
x)
{-# INLINE concatMapS #-}

concatMapMS :: (Monad m, MonoFoldable mono)
             => (a -> m mono)
             -> StreamConduit a m (Element mono)
concatMapMS :: forall (m :: * -> *) mono a.
(Monad m, MonoFoldable mono) =>
(a -> m mono) -> StreamConduit a m (Element mono)
concatMapMS a -> m mono
f (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ([], ) m s
ms0)
  where
    step' :: ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' ([], s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip ([], s
s')
            Emit s
s' a
x -> do
                mono
o <- a -> m mono
f a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (forall mono. MonoFoldable mono => mono -> [Element mono]
otoList mono
o, s
s')
    step' ((Element mono
x:[Element mono]
xs), s
s) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall s o r. s -> o -> Step s o r
Emit ([Element mono]
xs, s
s) Element mono
x)
{-# INLINE concatMapMS #-}

concatS :: (Monad m, MonoFoldable mono)
         => StreamConduit mono m (Element mono)
concatS :: forall (m :: * -> *) mono.
(Monad m, MonoFoldable mono) =>
StreamConduit mono m (Element mono)
concatS = forall (m :: * -> *) mono a.
(Monad m, MonoFoldable mono) =>
(a -> mono) -> StreamConduit a m (Element mono)
concatMapS forall a. a -> a
id
{-# INLINE concatS #-}

data ScanState a s
    = ScanEnded
    | ScanContinues a s

scanlS :: Monad m => (a -> b -> a) -> a -> StreamConduit b m a
scanlS :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> a) -> a -> StreamConduit b m a
scanlS a -> b -> a
f a
seed0 (Stream s -> m (Step s b ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ScanState a s -> m (Step (ScanState a s) a ())
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall a s. a -> s -> ScanState a s
ScanContinues a
seed0) m s
ms0)
  where
    step' :: ScanState a s -> m (Step (ScanState a s) a ())
step' ScanState a s
ScanEnded = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
    step' (ScanContinues a
seed s
s) = do
        Step s b ()
res <- s -> m (Step s b ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s b ()
res of
            Stop () -> forall s o r. s -> o -> Step s o r
Emit forall a s. ScanState a s
ScanEnded a
seed
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (forall a s. a -> s -> ScanState a s
ScanContinues a
seed s
s')
            Emit s
s' b
x -> forall s o r. s -> o -> Step s o r
Emit (forall a s. a -> s -> ScanState a s
ScanContinues a
seed' s
s') a
seed
              where
                !seed' :: a
seed' = a -> b -> a
f a
seed b
x
{-# INLINE scanlS #-}

scanlMS :: Monad m => (a -> b -> m a) -> a -> StreamConduit b m a
scanlMS :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> m a) -> a -> StreamConduit b m a
scanlMS a -> b -> m a
f a
seed0 (Stream s -> m (Step s b ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ScanState a s -> m (Step (ScanState a s) a ())
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall a s. a -> s -> ScanState a s
ScanContinues a
seed0) m s
ms0)
  where
    step' :: ScanState a s -> m (Step (ScanState a s) a ())
step' ScanState a s
ScanEnded = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
    step' (ScanContinues a
seed s
s) = do
        Step s b ()
res <- s -> m (Step s b ())
step s
s
        case Step s b ()
res of
            Stop () -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> o -> Step s o r
Emit forall a s. ScanState a s
ScanEnded a
seed
            Skip s
s' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (forall a s. a -> s -> ScanState a s
ScanContinues a
seed s
s')
            Emit s
s' b
x -> do
                !a
seed' <- a -> b -> m a
f a
seed b
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> o -> Step s o r
Emit (forall a s. a -> s -> ScanState a s
ScanContinues a
seed' s
s') a
seed
{-# INLINE scanlMS #-}

mapAccumWhileS :: Monad m =>
    (a -> s -> Either s (s, b)) -> s -> StreamConduitT a b m s
mapAccumWhileS :: forall (m :: * -> *) a s b.
Monad m =>
(a -> s -> Either s (s, b)) -> s -> StreamConduitT a b m s
mapAccumWhileS a -> s -> Either s (s, b)
f s
initial (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (s, s) -> m (Step (s, s) b s)
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (s
initial, ) m s
ms0)
  where
    step' :: (s, s) -> m (Step (s, s) b s)
step' (!s
accum, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop s
accum
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (s
accum, s
s')
            Emit s
s' a
x -> case a -> s -> Either s (s, b)
f a
x s
accum of
                Right (!s
accum', b
r) -> forall s o r. s -> o -> Step s o r
Emit (s
accum', s
s') b
r
                Left   !s
accum'     -> forall s o r. r -> Step s o r
Stop s
accum'
{-# INLINE mapAccumWhileS #-}

mapAccumWhileMS :: Monad m =>
    (a -> s -> m (Either s (s, b))) -> s -> StreamConduitT a b m s
mapAccumWhileMS :: forall (m :: * -> *) a s b.
Monad m =>
(a -> s -> m (Either s (s, b))) -> s -> StreamConduitT a b m s
mapAccumWhileMS a -> s -> m (Either s (s, b))
f s
initial (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (s, s) -> m (Step (s, s) b s)
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (s
initial, ) m s
ms0)
  where
    step' :: (s, s) -> m (Step (s, s) b s)
step' (!s
accum, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop s
accum
            Skip s
s' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip (s
accum, s
s')
            Emit s
s' a
x -> do
                Either s (s, b)
lr <- a -> s -> m (Either s (s, b))
f a
x s
accum
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Either s (s, b)
lr of
                    Right (!s
accum', b
r) -> forall s o r. s -> o -> Step s o r
Emit (s
accum', s
s') b
r
                    Left   !s
accum'     -> forall s o r. r -> Step s o r
Stop s
accum'
{-# INLINE mapAccumWhileMS #-}

data IntersperseState a s
    = IFirstValue s
    | IGotValue s a
    | IEmitValue s a

intersperseS :: Monad m => a -> StreamConduit a m a
intersperseS :: forall (m :: * -> *) a. Monad m => a -> StreamConduit a m a
intersperseS a
sep (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream IntersperseState a s -> m (Step (IntersperseState a s) a ())
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM forall a s. s -> IntersperseState a s
IFirstValue m s
ms0)
  where
    step' :: IntersperseState a s -> m (Step (IntersperseState a s) a ())
step' (IFirstValue s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (forall a s. s -> IntersperseState a s
IFirstValue s
s')
            Emit s
s' a
x -> forall s o r. s -> o -> Step s o r
Emit (forall a s. s -> a -> IntersperseState a s
IGotValue s
s' a
x) a
x
    -- Emit the separator once we know it's not the end of the list.
    step' (IGotValue s
s a
x) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (forall a s. s -> a -> IntersperseState a s
IGotValue s
s' a
x)
            Emit s
s' a
x' -> forall s o r. s -> o -> Step s o r
Emit (forall a s. s -> a -> IntersperseState a s
IEmitValue s
s' a
x') a
sep
    -- We emitted a separator, now emit the value that comes after.
    step' (IEmitValue s
s a
x) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> o -> Step s o r
Emit (forall a s. s -> a -> IntersperseState a s
IGotValue s
s a
x) a
x
{-# INLINE intersperseS #-}

data SlidingWindowState seq s
    = SWInitial Int seq s
    | SWSliding seq s
    | SWEarlyExit

slidingWindowS :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> StreamConduit a m seq
slidingWindowS :: forall (m :: * -> *) seq a.
(Monad m, IsSequence seq, Element seq ~ a) =>
Int -> StreamConduit a m seq
slidingWindowS Int
sz (Stream s -> m (Step s a ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream SlidingWindowState seq s
-> m (Step (SlidingWindowState seq s) seq ())
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall seq s. Int -> seq -> s -> SlidingWindowState seq s
SWInitial (forall a. Ord a => a -> a -> a
max Int
1 Int
sz) forall a. Monoid a => a
mempty) m s
ms0)
  where
    step' :: SlidingWindowState seq s
-> m (Step (SlidingWindowState seq s) seq ())
step' (SWInitial Int
n seq
st s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. s -> o -> Step s o r
Emit forall seq s. SlidingWindowState seq s
SWEarlyExit seq
st
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (forall seq s. Int -> seq -> s -> SlidingWindowState seq s
SWInitial Int
n seq
st s
s')
            Emit s
s' a
x ->
                if Int
n forall a. Eq a => a -> a -> Bool
== Int
1
                    then forall s o r. s -> o -> Step s o r
Emit (forall seq s. seq -> s -> SlidingWindowState seq s
SWSliding (forall seq. IsSequence seq => seq -> seq
Seq.unsafeTail seq
st') s
s') seq
st'
                    else forall s o r. s -> Step s o r
Skip (forall seq s. Int -> seq -> s -> SlidingWindowState seq s
SWInitial (Int
n forall a. Num a => a -> a -> a
- Int
1) seq
st' s
s')
              where
                st' :: seq
st' = forall seq. SemiSequence seq => seq -> Element seq -> seq
Seq.snoc seq
st a
x
    -- After collecting the initial window, each upstream element
    -- causes an additional window to be yielded.
    step' (SWSliding seq
st s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (forall seq s. seq -> s -> SlidingWindowState seq s
SWSliding seq
st s
s')
            Emit s
s' a
x -> forall s o r. s -> o -> Step s o r
Emit (forall seq s. seq -> s -> SlidingWindowState seq s
SWSliding (forall seq. IsSequence seq => seq -> seq
Seq.unsafeTail seq
st') s
s') seq
st'
              where
                st' :: seq
st' = forall seq. SemiSequence seq => seq -> Element seq -> seq
Seq.snoc seq
st a
x
    step' SlidingWindowState seq s
SWEarlyExit = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()

{-# INLINE slidingWindowS #-}

filterMS :: Monad m
         => (a -> m Bool)
         -> StreamConduit a m a
filterMS :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> StreamConduit a m a
filterMS a -> m Bool
f (Stream s -> m (Step s a ())
step m s
ms0) = do
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream s -> m (Step s a ())
step' m s
ms0
  where
    step' :: s -> m (Step s a ())
step' s
s = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip s
s'
            Emit s
s' a
x -> do
                Bool
r <- a -> m Bool
f a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                    if Bool
r
                        then forall s o r. s -> o -> Step s o r
Emit s
s' a
x
                        else forall s o r. s -> Step s o r
Skip s
s'
{-# INLINE filterMS #-}

data SplitState seq s
    = SplitDone
    -- When no element of seq passes the predicate.  This allows
    -- 'splitOnUnboundedES' to not run 'Seq.break' multiple times due
    -- to 'Skip's being sent by the upstream.
    | SplitNoSep seq s
    | SplitState seq s

splitOnUnboundedES :: (Monad m, Seq.IsSequence seq)
                   => (Element seq -> Bool) -> StreamConduit seq m seq
splitOnUnboundedES :: forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
(Element seq -> Bool) -> StreamConduit seq m seq
splitOnUnboundedES Element seq -> Bool
f (Stream s -> m (Step s seq ())
step m s
ms0) =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream SplitState seq s -> m (Step (SplitState seq s) seq ())
step' (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall seq s. seq -> s -> SplitState seq s
SplitState forall a. Monoid a => a
mempty) m s
ms0)
  where
    step' :: SplitState seq s -> m (Step (SplitState seq s) seq ())
step' SplitState seq s
SplitDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
    step' (SplitNoSep seq
t s
s) = do
        Step s seq ()
res <- s -> m (Step s seq ())
step s
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s seq ()
res of
            Stop () | Bool -> Bool
not (forall mono. MonoFoldable mono => mono -> Bool
onull seq
t) -> forall s o r. s -> o -> Step s o r
Emit forall seq s. SplitState seq s
SplitDone seq
t
                    | Bool
otherwise -> forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> forall s o r. s -> Step s o r
Skip (forall seq s. seq -> s -> SplitState seq s
SplitNoSep seq
t s
s')
            Emit s
s' seq
t' -> forall s o r. s -> Step s o r
Skip (forall seq s. seq -> s -> SplitState seq s
SplitState (seq
t forall a. Monoid a => a -> a -> a
`mappend` seq
t') s
s')
    step' (SplitState seq
t s
s) = do
        if forall mono. MonoFoldable mono => mono -> Bool
onull seq
y
            then do
                Step s seq ()
res <- s -> m (Step s seq ())
step s
s
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s seq ()
res of
                    Stop () | Bool -> Bool
not (forall mono. MonoFoldable mono => mono -> Bool
onull seq
t) -> forall s o r. s -> o -> Step s o r
Emit forall seq s. SplitState seq s
SplitDone seq
t
                            | Bool
otherwise -> forall s o r. r -> Step s o r
Stop ()
                    Skip s
s' -> forall s o r. s -> Step s o r
Skip (forall seq s. seq -> s -> SplitState seq s
SplitNoSep seq
t s
s')
                    Emit s
s' seq
t' -> forall s o r. s -> Step s o r
Skip (forall seq s. seq -> s -> SplitState seq s
SplitState (seq
t forall a. Monoid a => a -> a -> a
`mappend` seq
t') s
s')
            else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> o -> Step s o r
Emit (forall seq s. seq -> s -> SplitState seq s
SplitState (forall seq. IsSequence seq => Index seq -> seq -> seq
Seq.drop Index seq
1 seq
y) s
s) seq
x
      where
        (seq
x, seq
y) = forall seq.
IsSequence seq =>
(Element seq -> Bool) -> seq -> (seq, seq)
Seq.break Element seq -> Bool
f seq
t
{-# INLINE splitOnUnboundedES #-}

-- | Streaming versions of @Data.Conduit.Combinators.Internal.initReplicate@
initReplicateS :: Monad m => m seed -> (seed -> m a) -> Int -> StreamProducer m a
initReplicateS :: forall (m :: * -> *) seed a.
Monad m =>
m seed -> (seed -> m a) -> Int -> StreamProducer m a
initReplicateS m seed
mseed seed -> m a
f Int
cnt Stream m i ()
_ =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Int, seed) -> m (Step (Int, seed) a ())
step (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Int
cnt, ) m seed
mseed)
  where
    step :: (Int, seed) -> m (Step (Int, seed) a ())
step (Int
ix, seed
_) | Int
ix forall a. Ord a => a -> a -> Bool
<= Int
0 = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
    step (Int
ix, seed
seed) = do
        a
x <- seed -> m a
f seed
seed
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> o -> Step s o r
Emit (Int
ix forall a. Num a => a -> a -> a
- Int
1, seed
seed) a
x
{-# INLINE initReplicateS #-}

-- | Streaming versions of @Data.Conduit.Combinators.Internal.initRepeat@
initRepeatS :: Monad m => m seed -> (seed -> m a) -> StreamProducer m a
initRepeatS :: forall (m :: * -> *) seed a.
Monad m =>
m seed -> (seed -> m a) -> StreamProducer m a
initRepeatS m seed
mseed seed -> m a
f Stream m i ()
_ =
    forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream seed -> m (Step seed a ())
step m seed
mseed
  where
    step :: seed -> m (Step seed a ())
step seed
seed = do
        a
x <- seed -> m a
f seed
seed
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> o -> Step s o r
Emit seed
seed a
x
{-# INLINE initRepeatS #-}

-- | Utility function
fmapS :: Monad m
      => (a -> b)
      -> StreamConduitT i o m a
      -> StreamConduitT i o m b
fmapS :: forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS a -> b
f StreamConduitT i o m a
s Stream m i ()
inp =
    case StreamConduitT i o m a
s Stream m i ()
inp of
        Stream s -> m (Step s o a)
step m s
ms0 -> forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f)) s -> m (Step s o a)
step) m s
ms0
{-# INLINE fmapS #-}