module Data.Conduit.List
(
sourceList
, sourceNull
, unfold
, unfoldEither
, unfoldM
, unfoldEitherM
, enumFromTo
, iterate
, replicate
, replicateM
, fold
, foldMap
, take
, drop
, head
, peek
, consume
, sinkNull
, foldMapM
, foldM
, mapM_
, map
, mapMaybe
, mapFoldable
, catMaybes
, concat
, concatMap
, concatMapAccum
, scanl
, scan
, mapAccum
, chunksOf
, groupBy
, groupOn1
, isolate
, filter
, mapM
, iterM
, scanlM
, scanM
, mapAccumM
, mapMaybeM
, mapFoldableM
, concatMapM
, concatMapAccumM
, sequence
) where
import qualified Prelude
import Prelude
( ($), return, (==), (), Int
, (.), id, Maybe (..), Monad
, Either (..)
, Bool (..)
, (>>)
, (>>=)
, seq
, otherwise
, Enum, Eq
, maybe
, (<=)
, (>)
)
import Data.Monoid (Monoid, mempty, mappend)
import qualified Data.Foldable as F
import Data.Conduit
import Data.Conduit.Internal.Fusion
import Data.Conduit.Internal.List.Stream
import qualified Data.Conduit.Internal as CI
import Control.Monad (when, (<=<), liftM, void)
import Control.Monad.Trans.Class (lift)
#include "fusion-macros.h"
unfold, unfoldC :: Monad m
=> (b -> Maybe (a, b))
-> b
-> Producer m a
unfoldC f =
go
where
go seed =
case f seed of
Just (a, seed') -> yield a >> go seed'
Nothing -> return ()
STREAMING(unfold, unfoldC, unfoldS, f x)
unfoldEither, unfoldEitherC :: Monad m
=> (b -> Either r (a, b))
-> b
-> ConduitM i a m r
unfoldEitherC f =
go
where
go seed =
case f seed of
Right (a, seed') -> yield a >> go seed'
Left r -> return r
STREAMING(unfoldEither, unfoldEitherC, unfoldEitherS, f x)
unfoldM, unfoldMC :: Monad m
=> (b -> m (Maybe (a, b)))
-> b
-> Producer m a
unfoldMC f =
go
where
go seed = do
mres <- lift $ f seed
case mres of
Just (a, seed') -> yield a >> go seed'
Nothing -> return ()
STREAMING(unfoldM, unfoldMC, unfoldMS, f seed)
unfoldEitherM, unfoldEitherMC :: Monad m
=> (b -> m (Either r (a, b)))
-> b
-> ConduitM i a m r
unfoldEitherMC f =
go
where
go seed = do
mres <- lift $ f seed
case mres of
Right (a, seed') -> yield a >> go seed'
Left r -> return r
STREAMING(unfoldEitherM, unfoldEitherMC, unfoldEitherMS, f seed)
sourceList, sourceListC :: Monad m => [a] -> Producer m a
sourceListC = Prelude.mapM_ yield
STREAMING(sourceList, sourceListC, sourceListS, xs)
enumFromTo, enumFromToC :: (Enum a, Prelude.Ord a, Monad m)
=> a
-> a
-> Producer m a
enumFromToC x0 y =
loop x0
where
loop x
| x Prelude.> y = return ()
| otherwise = yield x >> loop (Prelude.succ x)
STREAMING(enumFromTo, enumFromToC, enumFromToS, x0 y)
iterate, iterateC :: Monad m => (a -> a) -> a -> Producer m a
iterateC f =
go
where
go a = yield a >> go (f a)
STREAMING(iterate, iterateC, iterateS, f a)
replicate, replicateC :: Monad m => Int -> a -> Producer m a
replicateC cnt0 a =
loop cnt0
where
loop i
| i <= 0 = return ()
| otherwise = yield a >> loop (i 1)
STREAMING(replicate, replicateC, replicateS, cnt0 a)
replicateM, replicateMC :: Monad m => Int -> m a -> Producer m a
replicateMC cnt0 ma =
loop cnt0
where
loop i
| i <= 0 = return ()
| otherwise = lift ma >>= yield >> loop (i 1)
STREAMING(replicateM, replicateMC, replicateMS, cnt0 ma)
fold, foldC :: Monad m
=> (b -> a -> b)
-> b
-> Consumer a m b
foldC f =
loop
where
loop !accum = await >>= maybe (return accum) (loop . f accum)
STREAMING(fold, foldC, foldS, f accum)
foldM, foldMC :: Monad m
=> (b -> a -> m b)
-> b
-> Consumer a m b
foldMC f =
loop
where
loop accum = do
await >>= maybe (return accum) go
where
go a = do
accum' <- lift $ f accum a
accum' `seq` loop accum'
STREAMING(foldM, foldMC, foldMS, f accum)
connectFold :: Monad m => Source m a -> (b -> a -> b) -> b -> m b
connectFold (CI.ConduitM src0) f =
go (src0 CI.Done)
where
go (CI.Done ()) b = return b
go (CI.HaveOutput src _ a) b = go src Prelude.$! f b a
go (CI.NeedInput _ c) b = go (c ()) b
go (CI.Leftover src ()) b = go src b
go (CI.PipeM msrc) b = do
src <- msrc
go src b
connectFoldM :: Monad m => Source m a -> (b -> a -> m b) -> b -> m b
connectFoldM (CI.ConduitM src0) f =
go (src0 CI.Done)
where
go (CI.Done ()) b = return b
go (CI.HaveOutput src _ a) b = do
!b' <- f b a
go src b'
go (CI.NeedInput _ c) b = go (c ()) b
go (CI.Leftover src ()) b = go src b
go (CI.PipeM msrc) b = do
src <- msrc
go src b
foldMap :: (Monad m, Monoid b)
=> (a -> b)
-> Consumer a m b
INLINE_RULE(foldMap, f, let combiner accum = mappend accum . f in fold combiner mempty)
foldMapM :: (Monad m, Monoid b)
=> (a -> m b)
-> Consumer a m b
INLINE_RULE(foldMapM, f, let combiner accum = liftM (mappend accum) . f in foldM combiner mempty)
mapM_, mapM_C :: Monad m
=> (a -> m ())
-> Consumer a m ()
mapM_C f = awaitForever $ lift . f
STREAMING(mapM_, mapM_C, mapM_S, f)
srcMapM_ :: Monad m => Source m a -> (a -> m ()) -> m ()
srcMapM_ (CI.ConduitM src) f =
go (src CI.Done)
where
go (CI.Done ()) = return ()
go (CI.PipeM mp) = mp >>= go
go (CI.Leftover p ()) = go p
go (CI.HaveOutput p _ o) = f o >> go p
go (CI.NeedInput _ c) = go (c ())
drop, dropC :: Monad m
=> Int
-> Consumer a m ()
dropC =
loop
where
loop i | i <= 0 = return ()
loop count = await >>= maybe (return ()) (\_ -> loop (count 1))
STREAMING(drop, dropC, dropS, i)
take, takeC :: Monad m
=> Int
-> Consumer a m [a]
takeC =
loop id
where
loop front count | count <= 0 = return $ front []
loop front count = await >>= maybe
(return $ front [])
(\x -> loop (front . (x:)) (count 1))
STREAMING(take, takeC, takeS, i)
head, headC :: Monad m => Consumer a m (Maybe a)
headC = await
STREAMING0(head, headC, headS)
peek :: Monad m => Consumer a m (Maybe a)
peek = await >>= maybe (return Nothing) (\x -> leftover x >> return (Just x))
map, mapC :: Monad m => (a -> b) -> Conduit a m b
mapC f = awaitForever $ yield . f
STREAMING(map, mapC, mapS, f)
mapM, mapMC :: Monad m => (a -> m b) -> Conduit a m b
mapMC f = awaitForever $ \a -> lift (f a) >>= yield
STREAMING(mapM, mapMC, mapMS, f)
iterM, iterMC :: Monad m => (a -> m ()) -> Conduit a m a
iterMC f = awaitForever $ \a -> lift (f a) >> yield a
STREAMING(iterM, iterMC, iterMS, f)
mapMaybe, mapMaybeC :: Monad m => (a -> Maybe b) -> Conduit a m b
mapMaybeC f = awaitForever $ maybe (return ()) yield . f
STREAMING(mapMaybe, mapMaybeC, mapMaybeS, f)
mapMaybeM, mapMaybeMC :: Monad m => (a -> m (Maybe b)) -> Conduit a m b
mapMaybeMC f = awaitForever $ maybe (return ()) yield <=< lift . f
STREAMING(mapMaybeM, mapMaybeMC, mapMaybeMS, f)
catMaybes, catMaybesC :: Monad m => Conduit (Maybe a) m a
catMaybesC = awaitForever $ maybe (return ()) yield
STREAMING0(catMaybes, catMaybesC, catMaybesS)
concat, concatC :: (Monad m, F.Foldable f) => Conduit (f a) m a
concatC = awaitForever $ F.mapM_ yield
STREAMING0(concat, concatC, concatS)
concatMap, concatMapC :: Monad m => (a -> [b]) -> Conduit a m b
concatMapC f = awaitForever $ sourceList . f
STREAMING(concatMap, concatMapC, concatMapS, f)
concatMapM, concatMapMC :: Monad m => (a -> m [b]) -> Conduit a m b
concatMapMC f = awaitForever $ sourceList <=< lift . f
STREAMING(concatMapM, concatMapMC, concatMapMS, f)
concatMapAccum, concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b
concatMapAccumC f x0 = void (mapAccum f x0) =$= concat
STREAMING(concatMapAccum, concatMapAccumC, concatMapAccumS, f x0)
scanl :: Monad m => (a -> s -> (s, b)) -> s -> Conduit a m b
scanl f s = void $ mapAccum f s
scanlM :: Monad m => (a -> s -> m (s, b)) -> s -> Conduit a m b
scanlM f s = void $ mapAccumM f s
mapAccum, mapAccumC :: Monad m => (a -> s -> (s, b)) -> s -> ConduitM a b m s
mapAccumC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = case f a s of
(s', b) -> yield b >> loop s'
STREAMING(mapAccum, mapAccumC, mapAccumS, f s)
mapAccumM, mapAccumMC :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitM a b m s
mapAccumMC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = do (s', b) <- lift $ f a s
yield b
loop s'
STREAMING(mapAccumM, mapAccumMC, mapAccumMS, f s)
scan :: Monad m => (a -> b -> b) -> b -> ConduitM a b m b
INLINE_RULE(scan, f, mapAccum (\a b -> let r = f a b in (r, r)))
scanM :: Monad m => (a -> b -> m b) -> b -> ConduitM a b m b
INLINE_RULE(scanM, f, mapAccumM (\a b -> f a b >>= \r -> return (r, r)))
concatMapAccumM, concatMapAccumMC :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b
concatMapAccumMC f x0 = void (mapAccumM f x0) =$= concat
STREAMING(concatMapAccumM, concatMapAccumMC, concatMapAccumMS, f x0)
mapFoldable, mapFoldableC :: (Monad m, F.Foldable f) => (a -> f b) -> Conduit a m b
mapFoldableC f = awaitForever $ F.mapM_ yield . f
STREAMING(mapFoldable, mapFoldableC, mapFoldableS, f)
mapFoldableM, mapFoldableMC :: (Monad m, F.Foldable f) => (a -> m (f b)) -> Conduit a m b
mapFoldableMC f = awaitForever $ F.mapM_ yield <=< lift . f
STREAMING(mapFoldableM, mapFoldableMC, mapFoldableMS, f)
consume, consumeC :: Monad m => Consumer a m [a]
consumeC =
loop id
where
loop front = await >>= maybe (return $ front []) (\x -> loop $ front . (x:))
STREAMING0(consume, consumeC, consumeS)
chunksOf :: Monad m => Int -> Conduit a m [a]
chunksOf n =
start
where
start = await >>= maybe (return ()) (\x -> loop n (x:))
loop !count rest =
await >>= maybe (yield (rest [])) go
where
go y
| count > 1 = loop (count 1) (rest . (y:))
| otherwise = yield (rest []) >> loop n (y:)
groupBy, groupByC :: Monad m => (a -> a -> Bool) -> Conduit a m [a]
groupByC f =
start
where
start = await >>= maybe (return ()) (loop id)
loop rest x =
await >>= maybe (yield (x : rest [])) go
where
go y
| f x y = loop (rest . (y:)) x
| otherwise = yield (x : rest []) >> loop id y
STREAMING(groupBy, groupByC, groupByS, f)
groupOn1, groupOn1C :: (Monad m, Eq b)
=> (a -> b)
-> Conduit a m (a, [a])
groupOn1C f =
start
where
start = await >>= maybe (return ()) (loop id)
loop rest x =
await >>= maybe (yield (x, rest [])) go
where
go y
| f x == f y = loop (rest . (y:)) x
| otherwise = yield (x, rest []) >> loop id y
STREAMING(groupOn1, groupOn1C, groupOn1S, f)
isolate, isolateC :: Monad m => Int -> Conduit a m a
isolateC =
loop
where
loop count | count <= 0 = return ()
loop count = await >>= maybe (return ()) (\x -> yield x >> loop (count 1))
STREAMING(isolate, isolateC, isolateS, count)
filter, filterC :: Monad m => (a -> Bool) -> Conduit a m a
filterC f = awaitForever $ \i -> when (f i) (yield i)
STREAMING(filter, filterC, filterS, f)
filterFuseRight :: Monad m => Source m a -> (a -> Bool) -> Source m a
filterFuseRight (CI.ConduitM src) f = CI.ConduitM $ \rest -> let
go (CI.Done ()) = rest ()
go (CI.PipeM mp) = CI.PipeM (liftM go mp)
go (CI.Leftover p i) = CI.Leftover (go p) i
go (CI.HaveOutput p c o)
| f o = CI.HaveOutput (go p) c o
| otherwise = go p
go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c)
in go (src CI.Done)
sinkNull, sinkNullC :: Monad m => Consumer a m ()
sinkNullC = awaitForever $ \_ -> return ()
STREAMING0(sinkNull, sinkNullC, sinkNullS)
srcSinkNull :: Monad m => Source m a -> m ()
srcSinkNull (CI.ConduitM src) =
go (src CI.Done)
where
go (CI.Done ()) = return ()
go (CI.PipeM mp) = mp >>= go
go (CI.Leftover p ()) = go p
go (CI.HaveOutput p _ _) = go p
go (CI.NeedInput _ c) = go (c ())
sourceNull, sourceNullC :: Monad m => Producer m a
sourceNullC = return ()
STREAMING0(sourceNull, sourceNullC, sourceNullS)
sequence :: Monad m
=> Consumer i m o
-> Conduit i m o
sequence sink =
self
where
self = awaitForever $ \i -> leftover i >> sink >>= yield