Safe Haskell | None |
---|---|
Language | Haskell2010 |
This module is very closely modeled on Pipes.Prelude, Pipes.Group and Pipes.Parse. It maybe said to give independent expression to the conception of Producer manipulation articulated in the latter two modules. Because we dispense with piping and conduiting, the distinction between all of these modules collapses. The leading type is chosen to permit an api that is as close as possible to that of Data.List and the Prelude.
Import qualified thus:
import Streaming import qualified Streaming.Prelude as S
For the examples below, one sometimes needs
import Streaming.Prelude (each, yield, next, mapped, stdoutLn, stdinLn) import Data.Function ((&))
Other libraries that come up in passing are
import qualified Control.Foldl as L -- cabal install foldl import qualified Pipes as P import qualified Pipes.Prelude as P import qualified System.IO as IO
Here are some correspondences between the types employed here and elsewhere:
streaming | pipes | conduit | io-streams ------------------------------------------------------------------------------------------------------------------- Stream (Of a) m () | Producer a m () | Source m a | InputStream a | ListT m a | ConduitM () o m () | Generator r () ------------------------------------------------------------------------------------------------------------------- Stream (Of a) m r | Producer a m r | ConduitM () o m r | Generator a r ------------------------------------------------------------------------------------------------------------------- Stream (Of a) m (Stream (Of a) m r) | Producer a m (Producer a m r) | -------------------------------------------------------------------------------------------------------------------- Stream (Stream (Of a) m) r | FreeT (Producer a m) m r | -------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------- ByteString m () | Producer ByteString m () | Source m ByteString | InputStream ByteString --------------------------------------------------------------------------------------------------------------------
- data Of a b = !a :> b
- yield :: Monad m => a -> Stream (Of a) m ()
- each :: (Monad m, Foldable f) => f a -> Stream (Of a) m ()
- each' :: (Monad m, Foldable f) => f a -> Stream (Of a) m ()
- unfoldr :: Monad m => (s -> m (Either r (a, s))) -> s -> Stream (Of a) m r
- stdinLn :: MonadIO m => Stream (Of String) m ()
- readLn :: (MonadIO m, Read a) => Stream (Of a) m ()
- fromHandle :: MonadIO m => Handle -> Stream (Of String) m ()
- readFile :: MonadResource m => FilePath -> Stream (Of String) m ()
- iterate :: Monad m => (a -> a) -> a -> Stream (Of a) m r
- iterateM :: Monad m => (a -> m a) -> m a -> Stream (Of a) m r
- repeat :: Monad m => a -> Stream (Of a) m r
- repeatM :: Monad m => m a -> Stream (Of a) m r
- replicate :: Monad m => Int -> a -> Stream (Of a) m ()
- untilRight :: Monad m => m (Either a r) -> Stream (Of a) m r
- cycle :: (Monad m, Functor f) => Stream f m r -> Stream f m s
- replicateM :: Monad m => Int -> m a -> Stream (Of a) m ()
- enumFrom :: (Monad m, Enum n) => n -> Stream (Of n) m r
- enumFromThen :: (Monad m, Enum a) => a -> a -> Stream (Of a) m r
- seconds :: Stream (Of Double) IO r
- stdoutLn :: MonadIO m => Stream (Of String) m () -> m ()
- stdoutLn' :: MonadIO m => Stream (Of String) m r -> m r
- mapM_ :: Monad m => (a -> m b) -> Stream (Of a) m r -> m r
- print :: (MonadIO m, Show a) => Stream (Of a) m r -> m r
- toHandle :: MonadIO m => Handle -> Stream (Of String) m r -> m r
- writeFile :: MonadResource m => FilePath -> Stream (Of String) m r -> m r
- first :: Monad m => Stream (Of r) m r -> m r
- effects :: Monad m => Stream (Of a) m r -> m r
- erase :: Monad m => Stream (Of a) m r -> Stream Identity m r
- drained :: (Monad m, Monad (t m), Functor (t m), MonadTrans t) => t m (Stream (Of a) m r) -> t m r
- map :: Monad m => (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
- mapM :: Monad m => (a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
- maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r
- mapped :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r
- for :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> Stream f m x) -> Stream f m r
- with :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> f x) -> Stream f m r
- subst :: (Monad m, Functor f) => (a -> f x) -> Stream (Of a) m r -> Stream f m r
- copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
- copy' :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
- store :: Monad m => (Stream (Of a) (Stream (Of a) m) r -> t) -> Stream (Of a) m r -> t
- chain :: Monad m => (a -> m ()) -> Stream (Of a) m r -> Stream (Of a) m r
- sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m r
- filter :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r
- filterM :: Monad m => (a -> m Bool) -> Stream (Of a) m r -> Stream (Of a) m r
- delay :: MonadIO m => Double -> Stream (Of a) m r -> Stream (Of a) m r
- intersperse :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r
- take :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m ()
- takeWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m ()
- drop :: Monad m => Int -> Stream (Of a) m r -> Stream (Of a) m r
- dropWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r
- concat :: (Monad m, Foldable f) => Stream (Of (f a)) m r -> Stream (Of a) m r
- scan :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> Stream (Of b) m r
- scanM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
- scanned :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> Stream (Of (a, b)) m r
- read :: (Monad m, Read a) => Stream (Of String) m r -> Stream (Of a) m r
- show :: (Monad m, Show a) => Stream (Of a) m r -> Stream (Of String) m r
- cons :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r
- duplicate :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
- duplicate' :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
- next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
- uncons :: Monad m => Stream (Of a) m () -> m (Maybe (a, Stream (Of a) m ()))
- splitAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r)
- split :: (Eq a, Monad m) => a -> Stream (Of a) m r -> Stream (Stream (Of a) m) m r
- break :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
- breakWhen :: Monad m => (x -> a -> x) -> x -> (x -> b) -> (b -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
- span :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
- group :: (Monad m, Eq a) => Stream (Of a) m r -> Stream (Stream (Of a) m) m r
- groupBy :: Monad m => (a -> a -> Bool) -> Stream (Of a) m r -> Stream (Stream (Of a) m) m r
- distinguish :: (a -> Bool) -> Of a r -> Sum (Of a) (Of a) r
- switch :: Sum f g r -> Sum g f r
- separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r
- unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r
- eitherToSum :: Of (Either a b) r -> Sum (Of a) (Of b) r
- sumToEither :: Sum (Of a) (Of b) r -> Of (Either a b) r
- sumToCompose :: Sum f f r -> Compose (Of Bool) f r
- composeToSum :: Compose (Of Bool) f r -> Sum f f r
- fold :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> m (Of b r)
- fold_ :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> m b
- foldM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m (Of b r)
- foldM_ :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m b
- all :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m (Of Bool r)
- all_ :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m Bool
- any :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m (Of Bool r)
- any_ :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m Bool
- sum :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r)
- sum_ :: (Monad m, Num a) => Stream (Of a) m () -> m a
- product :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r)
- product_ :: (Monad m, Num a) => Stream (Of a) m () -> m a
- head :: Monad m => Stream (Of a) m r -> m (Of (Maybe a) r)
- head_ :: Monad m => Stream (Of a) m r -> m (Maybe a)
- last :: Monad m => Stream (Of a) m r -> m (Of (Maybe a) r)
- last_ :: Monad m => Stream (Of a) m r -> m (Maybe a)
- elem :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m (Of Bool r)
- elem_ :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m Bool
- notElem :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m (Of Bool r)
- notElem_ :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m Bool
- length :: Monad m => Stream (Of a) m r -> m (Of Int r)
- length_ :: Monad m => Stream (Of a) m r -> m Int
- toList :: Monad m => Stream (Of a) m r -> m (Of [a] r)
- toList_ :: Monad m => Stream (Of a) m () -> m [a]
- mconcat :: (Monad m, Monoid w) => Stream (Of w) m r -> m (Of w r)
- mconcat_ :: (Monad m, Monoid w) => Stream (Of w) m r -> m w
- minimum :: (Monad m, Ord a) => Stream (Of a) m r -> m (Of (Maybe a) r)
- minimum_ :: (Monad m, Ord a) => Stream (Of a) m r -> m (Maybe a)
- maximum :: (Monad m, Ord a) => Stream (Of a) m r -> m (Of (Maybe a) r)
- maximum_ :: (Monad m, Ord a) => Stream (Of a) m r -> m (Maybe a)
- foldrM :: Monad m => (a -> m r -> m r) -> Stream (Of a) m r -> m r
- foldrT :: (Monad m, MonadTrans t, Monad (t m)) => (a -> t m r -> t m r) -> Stream (Of a) m r -> t m r
- zip :: Monad m => Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of (a, b)) m r
- zipWith :: Monad m => (a -> b -> c) -> Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r
- zip3 :: Monad m => Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r -> Stream (Of (a, b, c)) m r
- zipWith3 :: Monad m => (a -> b -> c -> d) -> Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r -> Stream (Of d) m r
- unzip :: Monad m => Stream (Of (a, b)) m r -> Stream (Of a) (Stream (Of b) m) r
- partitionEithers :: Monad m => Stream (Of (Either a b)) m r -> Stream (Of a) (Stream (Of b) m) r
- partition :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
- catMaybes :: Monad m => Stream (Of (Maybe a)) m r -> Stream (Of a) m r
- mapMaybe :: Monad m => (a -> Maybe b) -> Stream (Of a) m r -> Stream (Of b) m r
- lazily :: Of a b -> (a, b)
- strictly :: (a, b) -> Of a b
- fst' :: Of a b -> a
- snd' :: Of a b -> b
- reread :: Monad m => (s -> m (Maybe a)) -> s -> Stream (Of a) m ()
- data Stream f m r
Types
A left-strict pair; the base functor for streams of individual elements.
!a :> b infixr 5 |
Monoid a => Monad (Of a) Source | |
Functor (Of a) Source | |
Monoid a => Applicative (Of a) Source | |
Foldable (Of a) Source | |
Traversable (Of a) Source | |
(Eq a, Eq b) => Eq (Of a b) Source | |
(Data a, Data b) => Data (Of a b) Source | |
(Ord a, Ord b) => Ord (Of a b) Source | |
(Read a, Read b) => Read (Of a b) Source | |
(Show a, Show b) => Show (Of a b) Source | |
(Monoid a, Monoid b) => Monoid (Of a b) Source |
Introducing streams of elements
yield :: Monad m => a -> Stream (Of a) m () Source
A singleton stream
>>>
stdoutLn $ yield "hello"
hello
>>>
S.sum $ do {yield 1; yield 2}
3
>>>
let prompt = putStrLn "Enter a number:"
>>>
let number = lift (prompt >> readLn) >>= yield :: Stream (Of Int) IO ()
>>>
S.toList $ do {number; number; number}
Enter a number: 1 Enter a number: 2 Enter a number: 3 [1,2,3] :> ()
each :: (Monad m, Foldable f) => f a -> Stream (Of a) m () Source
Stream the elements of a pure, foldable container.
>>>
S.print $ each [1..3]
1 2 3>>>
S.print $ mapped S.toList $ chunksOf 3 $ S.replicateM 5 getLine
s<Enter> t<Enter> u<Enter> ["s","t","u"] v<Enter> w<Enter> ["v","w"]
unfoldr :: Monad m => (s -> m (Either r (a, s))) -> s -> Stream (Of a) m r Source
Build a Stream
by unfolding steps starting from a seed.
The seed can of course be anything, but this is one natural way
to consume a pipes
Producer
. Consider:
>>>
S.stdoutLn $ S.take 2 $ S.unfoldr P.next P.stdinLn
hello<Enter> hello goodbye<Enter> goodbye
>>>
S.stdoutLn $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2)
hello<Enter> hello goodbye<Enter> goodbye
>>>
S.effects $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2 P.>-> P.stdoutLn)
hello<Enter> hello goodbye<Enter> goodbye
stdinLn :: MonadIO m => Stream (Of String) m () Source
View standard input as a 'Stream (Of String) m r'. stdoutLn
, by
contrast, renders a 'Stream (Of String) m r' to standard output. The names
follow Pipes.Prelude
>>>
stdoutLn stdinLn
hello<Enter> hello world<Enter> world ^CInterrupted.
>>>
stdoutLn $ S.map reverse stdinLn
hello<Enter> olleh world<Enter> dlrow ^CInterrupted.
readLn :: (MonadIO m, Read a) => Stream (Of a) m () Source
Read values from stdin
, ignoring failed parses
>>>
S.sum_ $ S.take 2 S.readLn :: IO Int
10<Enter> 12<Enter> 22
>>>
S.toList $ S.take 3 (S.readLn :: Stream (Of Int) IO ())
1<Enter> 2<Enter> 1@#$%^&*\<Enter> 3<Enter> [1,2,3] :> ()
readFile :: MonadResource m => FilePath -> Stream (Of String) m () Source
Read the lines of a file as Haskell String
s
>>>
runResourceT $ S.writeFile "lines.txt" $ S.take 2 S.stdinLn
hello<Enter> world<Enter>>>>
runResourceT $ S.print $ S.readFile "lines.txt"
"hello" "world"
runResourceT
, as it is used here, means something like closing_all_handles
.
It makes it possible to write convenient, fairly sensible versions of
readFile
, writeFile
and appendFile
. IO.withFile IO.ReadMode ...
is more complicated but is generally to be preferred. Its use is explained
here.
iterate :: Monad m => (a -> a) -> a -> Stream (Of a) m r Source
Iterate a pure function from a seed value, streaming the results forever
iterateM :: Monad m => (a -> m a) -> m a -> Stream (Of a) m r Source
Iterate a monadic function from a seed value, streaming the results forever
repeat :: Monad m => a -> Stream (Of a) m r Source
Repeat an element ad inf. .
>>>
S.print $ S.take 3 $ S.repeat 1
1 1 1
repeatM :: Monad m => m a -> Stream (Of a) m r Source
Repeat a monadic action ad inf., streaming its results.
>>>
S.toList $ S.take 2 $ repeatM getLine
one<Enter> two<Enter> ["one","two"]
cycle :: (Monad m, Functor f) => Stream f m r -> Stream f m s Source
Cycle repeatedly through the layers of a stream, ad inf. This function is functor-general
cycle = forever
>>>
rest <- S.print $ S.splitAt 3 $ S.cycle (yield 0 >> yield 1)
True False True>>>
S.print $ S.take 3 rest
False True False
replicateM :: Monad m => Int -> m a -> Stream (Of a) m () Source
Repeat an action several times, streaming the results.
>>>
S.print $ S.replicateM 2 getCurrentTime
2015-08-18 00:57:36.124508 UTC 2015-08-18 00:57:36.124785 UTC
enumFrom :: (Monad m, Enum n) => n -> Stream (Of n) m r Source
An infinite stream of enumerable values, starting from a given value.
It is the same as `S.iterate succ`.
Because their return type is polymorphic, enumFrom
and enumFromThen
(and iterate
are useful for example with zip
and zipWith
, which require the same return type in the zipped streams.
With each [1..]
the following bit of connect-and-resume would be impossible:
>>>
rest <- S.print $ S.zip (S.enumFrom 'a') $ S.splitAt 3 $ S.enumFrom 1
('a',1) ('b',2) ('c',3)>>>
S.print $ S.take 3 rest
4 5 6
enumFromThen :: (Monad m, Enum a) => a -> a -> Stream (Of a) m r Source
An infinite sequence of enumerable values at a fixed distance, determined
by the first and second values. See the discussion of enumFrom
>>>
S.print $ S.take 3 $ S.enumFromThen 100 200
100 200 300
seconds :: Stream (Of Double) IO r Source
Streams the number of seconds from the beginning of action
Thus, to mark times of user input we might write something like:
>>>
S.toList $ S.take 3 $ S.zip S.seconds S.stdinLn
a<Enter> b<Enter> c<Enter> [(0.0,"a"),(1.088711,"b"),(3.7289649999999996,"c")] :> ()
To restrict user input to some number of seconds, we might write:
>>>
S.toList $ S.map fst $ S.zip S.stdinLn $ S.takeWhile (< 3) S.seconds
one<Enter> two<Enter> three<Enter> four<Enter> five<Enter> ["one","two","three","four","five"] :> ()
This is of course does not interrupt an action that has already begun.
Consuming streams of elements
mapM_ :: Monad m => (a -> m b) -> Stream (Of a) m r -> m r Source
Reduce a stream to its return value with a monadic action.
>>>
S.mapM_ Prelude.print $ each [1..5]
1 2 3 4 5>>>
rest <- S.mapM_ Prelude.print $ S.splitAt 3 $ each [1..10]
1 2 3>>>
S.sum rest
49 :> ()
print :: (MonadIO m, Show a) => Stream (Of a) m r -> m r Source
Print the elements of a stream as they arise.
>>>
S.print $ S.take 2 S.stdinLn
hello "hello" world "world">>>
toHandle :: MonadIO m => Handle -> Stream (Of String) m r -> m r Source
Write a succession of strings to a handle as separate lines.
>>>
S.toHandle IO.stdout $ each $ words "one two three"
one two three
writeFile :: MonadResource m => FilePath -> Stream (Of String) m r -> m r Source
Write a series of strings as lines to a file. The handle is crudely
managed with ResourceT
:
>>>
runResourceT $ S.writeFile "lines.txt" $ S.take 2 S.stdinLn
hello<Enter> world<Enter>>>>
runResourceT $ S.print $ S.readFile "lines.txt"
"hello" "world"
effects :: Monad m => Stream (Of a) m r -> m r Source
Reduce a stream, performing its actions but ignoring its elements.
>>>
rest <- S.effects $ S.splitAt 2 $ each [1..5]
>>>
S.print rest
3 4 5 'effects' should be understood together with 'copy' and is subject to the rules
S.effects . S.copy = id hoist S.effects . S.copy = id
The similar effects
and copy
operations in Data.ByteString.Streaming
obey the same rules.
erase :: Monad m => Stream (Of a) m r -> Stream Identity m r Source
Remove the elements from a stream of values, retaining the structure of layers.
drained :: (Monad m, Monad (t m), Functor (t m), MonadTrans t) => t m (Stream (Of a) m r) -> t m r Source
Where a transformer returns a stream, run the effects of the stream, keeping the return value. This is usually used at the type
drained :: Monad m => Stream (Of a) m (Stream (Of b) m r) -> Stream (Of a) m r drained = join . fmap (lift . effects)
Here, for example, we split a stream in two places and throw out the middle segment:
>>>
rest <- S.print $ S.drained $ S.splitAt 2 $ S.splitAt 5 $ each [1..7]
1 2>>>
S.print rest
6 7
In particular, we can define versions of take
and takeWhile
which
retrieve the return value of the rest of the stream - and which can
thus be used with maps
:
take' n = S.drained . S.splitAt n takeWhile' thus = S.drained . S.span thus
Stream transformers
map :: Monad m => (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r Source
Standard map on the elements of a stream.
>>>
S.stdoutLn $ S.map reverse $ each (words "alpha beta")
ahpla ateb
mapM :: Monad m => (a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r Source
Replace each element of a stream with the result of a monadic action
>>>
S.print $ S.mapM readIORef $ S.chain (\ior -> modifyIORef ior (*100)) $ S.mapM newIORef $ each [1..6]
100 200 300 400 500 600
maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source
Map layers of one functor to another with a transformation. Compare
hoist, which has a similar effect on the monadic
parameter.
maps id = id maps f . maps g = maps (f . g)
mapped :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source
Map layers of one functor to another with a transformation involving the base monad. This could be trivial, e.g.
let noteBeginning text x = putStrLn text >> return text
this puts the is completely functor-general
maps
and mapped
obey these rules:
maps id = id mapped return = id maps f . maps g = maps (f . g) mapped f . mapped g = mapped (f <=< g) maps f . mapped g = mapped (liftM f . g) mapped f . maps g = mapped (f <=< liftM g)
maps
is more fundamental than mapped
, which is best understood as a convenience
for effecting this frequent composition:
mapped phi = decompose . maps (Compose . phi)
for :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> Stream f m x) -> Stream f m r Source
for
replaces each element of a stream with an associated stream. Note that the
associated stream may layer any functor.
with :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> f x) -> Stream f m r Source
Replace each element in a stream of individual Haskell values (a Stream (Of a) m r
) with an associated functorial
step.
for str f = concats (with str f) with str f = for str (yields . f) with str f = maps (\(a:>r) -> r <$ f a) str with = flip subst subst = flip with
>>>
with (each [1..3]) (yield . show) & intercalates (yield "--") & S.stdoutLn
1 -- 2 -- 3
subst :: (Monad m, Functor f) => (a -> f x) -> Stream (Of a) m r -> Stream f m r Source
Replace each element in a stream of individual values with a functorial
layer of any sort. subst = flip with
and is more convenient in
a sequence of compositions that transform a stream.
with = flip subst for str f = concats $ subst f str subst f = maps (\(a:>r) -> r <$ f a) S.concat = concats . subst each
copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source
Duplicate the content of stream, so that it can be acted on twice in different ways,
but without breaking streaming. Thus, with each [1,2]
I might do:
>>>
S.print $ each ["one","two"]
"one" "two">>>
S.stdoutLn $ each ["one","two"]
one two
With copy, I can as well do:
>>>
S.print $ S.stdoutLn $ S.copy $ each ["one","two"]
one "one" two "two"
copy
should be understood together with effects
and is subject to the rules
S.effects . S.copy = id hoist S.effects . S.copy = id
The similar operations in Streaming
obey the same rules.
Where the actions you are contemplating are each simple folds over
the elements, or a selection of elements, then the coupling of the
folds is often more straightforwardly effected with Foldl
,
e.g.
>>>
L.purely S.fold (liftA2 (,) L.sum L.product) $ each [1..10]
(55,3628800) :> ()
rather than
>>>
S.sum $ S.product . S.copy $ each [1..10]
55 :> (3628800 :> ())
A Control.Foldl
fold can be altered to act on a selection of elements by
using handles
on an appropriate lens. Some such
manipulations are simpler and more List
-like, using copy
:
>>>
L.purely S.fold (liftA2 (,) (L.handles (filtered odd) L.sum) (L.handles (filtered even) L.product)) $ each [1..10]
(25,3840) :> ()
becomes
>>>
S.sum $ S.filter odd $ S.product $ S.filter even $ S.copy $ each [1..10]
25 :> (3840 :> ())
or using store
>>>
S.sum $ S.filter odd $ S.store (S.product . S.filter even) $ each [1..10]
25 :> (3840 :> ())
But anything that fold of a Stream (Of a) m r
into e.g. an m (Of b r)
that has a constraint on m
that is carried over into Stream f m
-
e.g. Monad
, MonadIO
, MonadResource
, etc. can be used on the stream.
Thus, I can fold over different groupings of the original stream:
>>>
(S.toList . mapped S.toList . chunksOf 5) $ (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ each [1..10]
[[1,2,3,4,5],[6,7,8,9,10]] :> ([[1,2,3],[4,5,6],[7,8,9],[10]] :> ())
The procedure can be iterated as one pleases, as one can see from this (otherwise unadvisable!) example:
>>>
(S.toList . mapped S.toList . chunksOf 4) $ (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ (S.toList . mapped S.toList . chunksOf 2) $ S.copy $ each [1..12]
[[1,2,3,4],[5,6,7,8],[9,10,11,12]] :> ([[1,2,3],[4,5,6],[7,8,9],[10,11,12]] :> ([[1,2],[3,4],[5,6],[7,8],[9,10],[11,12]] :> ()))
copy' :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source
copy'
is the same as copy
but reverses the order of interleaved effects.
The difference should not be observable at all for pure folds over the data.
store :: Monad m => (Stream (Of a) (Stream (Of a) m) r -> t) -> Stream (Of a) m r -> t Source
Store the result of any suitable fold over a stream, keeping the stream for
further manipulation. store f = f . copy
:
>>>
S.print $ S.store S.product $ each [1..4]
1 2 3 4 24 :> ()
>>>
S.print $ S.store S.sum $ S.store S.product $ each [1..4]
1 2 3 4 10 :> (24 :> ())
Here the sum (10) and the product (24) have been 'stored' for use when
finally we have traversed the stream with print
. Needless to say,
a second pass
is excluded conceptually, so the
folds that you apply successively with store
are performed
simultaneously, and in constant memory -- as they would be if,
say, you linked them together with Control.Fold
:
>>>
L.impurely S.foldM (liftA3 (\a b c -> (b,c)) (L.sink print) (L.generalize L.sum) (L.generalize L.product)) $ each [1..4]
1 2 3 4 (10,24) :> ()
Fusing folds after the fashion of Control.Foldl
will generally be a bit faster
than the corresponding succession of uses of store
, but by
constant factor that will be completely dwarfed when any IO is at issue.
But store
copy
is much/ more powerful, as you can see by reflecting on
uses like this:
>>>
S.sum $ S.store (S.sum . mapped S.product . chunksOf 2) $ S.store (S.product . mapped S.sum . chunksOf 2 )$ each [1..6]
21 :> (44 :> (231 :> ()))
It will be clear that this cannot be reproduced with any combination of lenses,
Control.Fold
folds, or the like. (See also the discussion of copy
.)
It would conceivable be clearer to import a series of specializations of store
.
It is intended to be used at types like these:
storeM :: (forall s m . Monad m => Stream (Of a) m s -> m (Of b s)) -> (Monad n => Stream (Of a) n r -> Stream (Of a) n (Of b r)) storeM = store storeMIO :: (forall s m . MonadIO m => Stream (Of a) m s -> m (Of b s)) -> ( MonadIO n => Stream (Of a) n r -> Stream (Of a) n (Of b r) storeMIO = store
It is clear from these types that we are just using the general instances:
instance (Functor f, Monad m ) => Monad (Stream f m) instance (Functor f, MonadIO m) => MonadIO (Stream f m)
We thus can't be touching the elements of the stream, or the final return value.
It it is the same with other constraints that Stream (Of a)
inherits,
like MonadResource
. Thus I can filter and write to one file, but
nub and write to another, or to a database or the like:
>>>
runResourceT $ (S.writeFile "hello2.txt" . S.nub) $ store (S.writeFile "hello.txt" . S.filter (/= "world")) $ each ["hello", "world", "goodbye", "world"]
>>>
:! cat hello.txt
hello goodbye>>>
:! cat hello2.txt
hello world goodbye
chain :: Monad m => (a -> m ()) -> Stream (Of a) m r -> Stream (Of a) m r Source
Apply an action to all values, re-yielding each
>>>
S.product $ S.chain Prelude.print $ S.each [1..5]
1 2 3 4 5 120 :> ()
sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m r Source
Like the sequence
but streaming. The result type is a
stream of a's, but is not accumulated; the effects of the elements
of the original stream are interleaved in the resulting stream. Compare:
sequence :: Monad m => [m a] -> m [a] sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m r
This obeys the rule
filter :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source
Skip elements of a stream that fail a predicate
filterM :: Monad m => (a -> m Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source
Skip elements of a stream that fail a monadic test
delay :: MonadIO m => Double -> Stream (Of a) m r -> Stream (Of a) m r Source
Interpolate a delay of n seconds between yields.
take :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m () Source
End a stream after n elements; the original return value is thus lost.
splitAt
preserves this information. Note that, like splitAt
, this
function is functor-general, so that, for example, you can take
not
just a number of items from a stream of elements, but a number
of substreams and the like.
>>>
S.toList $ S.take 3 $ each "with"
"wit" :> ()
>>>
runResourceT $ S.stdoutLn $ S.take 3 $ S.readFile "stream.hs"
import Streaming import qualified Streaming.Prelude as S import Streaming.Prelude (each, next, yield)
takeWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m () Source
End stream when an element fails a condition; the original return value is lost.
By contrast span
preserves this information, and is generally more desirable.
S.takeWhile thus = void . S.span thus
To preserve the information - but thus also force the rest of the stream to be developed - write
S.drained . S.span thus
as dropWhile thus
is
S.effects . S.span thus
drop :: Monad m => Int -> Stream (Of a) m r -> Stream (Of a) m r Source
Ignore the first n elements of a stream, but carry out the actions
>>>
S.toList $ S.drop 2 $ S.replicateM 5 getLine
a<Enter> b<Enter> c<Enter> d<Enter> e<Enter> ["c","d","e"] :> ()
Because it retains the final return value, drop n
is a suitable argument
for maps
:
>>>
S.toList $ concats $ maps (S.drop 4) $ chunksOf 5 $ each [1..20]
[5,10,15,20] :> ()
dropWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source
Ignore elements of a stream until a test succeeds, retaining the rest.
>>>
S.print $ S.dropWhile ((< 5) . length) S.stdinLn
one<Enter> two<Enter> three<Enter> "three" four<Enter> "four" ^CInterrupted.
concat :: (Monad m, Foldable f) => Stream (Of (f a)) m r -> Stream (Of a) m r Source
Make a stream of traversable containers into a stream of their separate elements. This is just
concat str = for str each
>>>
S.print $ S.concat (each ["xy","z"])
'x' 'y' 'z'
Note that it also has the effect of catMaybes
, rights
'map snd' and such-like operations.
>>>
S.print $ S.concat $ S.each [Just 1, Nothing, Just 2]
1 2>>>
S.print $ S.concat $ S.each [Right 1, Left "Error!", Right 2]
1 2>>>
S.print $ S.concat $ S.each [('A',1), ('B',2)]
1 2
scan :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> Stream (Of b) m r Source
Strict left scan, streaming, e.g. successive partial results.
>>>
S.print $ S.scan (++) "" id $ each (words "a b c d")
"" "a" "ab" "abc" "abcd"
scan
is fitted for use with Control.Foldl
, thus:
>>>
S.print $ L.purely S.scan L.list $ each [3..5]
[] [3] [3,4] [3,4,5]
scanM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> Stream (Of b) m r Source
Strict left scan, accepting a monadic function. It can be used with
FoldM
s from Control.Foldl
using impurely
. Here we yield
a succession of vectors each recording
>>>
let v = L.impurely scanM L.vector $ each [1..4::Int] :: Stream (Of (U.Vector Int)) IO ()
>>>
S.print v
fromList [] fromList [1] fromList [1,2] fromList [1,2,3] fromList [1,2,3,4]
scanned :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> Stream (Of (a, b)) m r Source
read :: (Monad m, Read a) => Stream (Of String) m r -> Stream (Of a) m r Source
Make a stream of strings into a stream of parsed values, skipping bad cases
>>>
S.sum_ $ S.read $ S.takeWhile (/= "total") S.stdinLn :: IO Int
1000<Enter> 2000<Enter> total<Enter> 3000
cons :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r Source
The natural cons
for a Stream (Of a)
.
cons a stream = yield a >> stream
Useful for interoperation:
Data.Text.foldr S.cons (return ()) :: Text -> Stream (Of Char) m () Lazy.foldrChunks S.cons (return ()) :: Lazy.ByteString -> Stream (Of Strict.ByteString) m ()
and so on.
Splitting and inspecting streams of elements
next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r)) Source
The standard way of inspecting the first item in a stream of elements, if the
stream is still 'running'. The Right
case contains a
Haskell pair, where the more general inspect
would return a left-strict pair.
There is no reason to prefer inspect
since, if the Right
case is exposed,
the first element in the pair will have been evaluated to whnf.
next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r)) inspect :: Monad m => Stream (Of a) m r -> m (Either r (Of a (Stream (Of a) m r)))
Interoperate with pipes
producers thus:
Pipes.unfoldr Stream.next :: Stream (Of a) m r -> Producer a m r Stream.unfoldr Pipes.next :: Producer a m r -> Stream (Of a) m r
Similarly:
IOStreams.unfoldM (liftM (either (const Nothing) Just) . next) :: Stream (Of a) IO b -> IO (InputStream a) Conduit.unfoldM (liftM (either (const Nothing) Just) . next) :: Stream (Of a) m r -> Source a m r
But see uncons
, which is better fitted to these unfoldM
s
uncons :: Monad m => Stream (Of a) m () -> m (Maybe (a, Stream (Of a) m ())) Source
Inspect the first item in a stream of elements, without a return value.
uncons
provides convenient exit into another streaming type:
IOStreams.unfoldM uncons :: Stream (Of a) IO b -> IO (InputStream a) Conduit.unfoldM uncons :: Stream (Of a) m r -> Conduit.Source m a
splitAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source
Split a succession of layers after some number, returning a streaming or
-- effectful pair. This function is the same as the splitsAt
exported by the
-- Streaming
module, but since this module is imported qualified, it can
-- usurp a Prelude name. It specializes to:
splitAt :: (Monad m, Functor f) => Int -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
split :: (Eq a, Monad m) => a -> Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source
Split a stream of elements wherever a given element arises.
The action is like that of words
.
>>>
S.stdoutLn $ mapped S.toList $ S.split ' ' $ each "hello world "
hello world
break :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source
Break a sequence upon meeting element falls under a predicate, keeping it and the rest of the stream as the return value.
>>>
rest <- S.print $ S.break even $ each [1,1,2,3]
1 1>>>
S.print rest
2 3
breakWhen :: Monad m => (x -> a -> x) -> x -> (x -> b) -> (b -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source
Yield elements, using a fold to maintain state, until the accumulated
value satifies the supplied predicate. The fold will then be short-circuited
and the element that breaks it will be put after the break.
This function is easiest to use with purely
>>>
rest <- each [1..10] & L.purely S.breakWhen L.sum (>10) & S.print
1 2 3 4>>>
S.print rest
5 6 7 8 9 10
span :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source
Stream elements until one fails the condition, return the rest.
group :: (Monad m, Eq a) => Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source
Group successive equal items together
>>>
S.toList $ mapped S.toList $ S.group $ each "baaaaad"
["b","aaaaa","d"] :> ()
>>>
S.toList $ concats $ maps (S.drained . S.splitAt 1) $ S.group $ each "baaaaaaad"
"bad" :> ()
groupBy :: Monad m => (a -> a -> Bool) -> Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source
Group elements of a stream in accordance with the supplied comparison.
>>>
S.print $ mapped S.toList $ S.groupBy (>=) $ each [1,2,3,1,2,3,4,3,2,4,5,6,7,6,5]
[1] [2] [3,1,2,3] [4,3,2,4] [5] [6] [7,6,5]
Sum and Compose manipulation
switch :: Sum f g r -> Sum g f r Source
Swap the order of functors in a sum of functors.
>>>
S.toListM' $ S.print $ separate $ maps S.switch $ maps (S.distinguish (=='a')) $ S.each "banana"
'a' 'a' 'a' "bnn" :> ()>>>
S.toListM' $ S.print $ separate $ maps (S.distinguish (=='a')) $ S.each "banana"
'b' 'n' 'n' "aaa" :> ()
separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r Source
Given a stream on a sum of functors, make it a stream on the left functor,
with the streaming on the other functor as the governing monad. This is
useful for acting on one or the other functor with a fold. It generalizes
partitionEithers
massively, but actually streams properly.
>>>
let odd_even = S.maps (S.distinguish even) $ S.each [1..10::Int]
>>>
:t separate odd_even
separate odd_even :: Monad m => Stream (Of Int) (Stream (Of Int) m) ()
Now, for example, it is convenient to fold on the left and right values separately:
>>>
S.toList $ S.toList $ separate odd_even
[2,4,6,8,10] :> ([1,3,5,7,9] :> ())
Or we can write them to separate files or whatever:
>>>
runResourceT $ S.writeFile "even.txt" . S.show $ S.writeFile "odd.txt" . S.show $ S.separate odd_even
>>>
:! cat even.txt
2 4 6 8 10>>>
:! cat odd.txt
1 3 5 7 9
Of course, in the special case of Stream (Of a) m r
, we can achieve the above
effects more simply by using copy
>>>
S.toList . S.filter even $ S.toList . S.filter odd $ S.copy $ each [1..10::Int]
[2,4,6,8,10] :> ([1,3,5,7,9] :> ())
But separate
and unseparate
are functor-general.
unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r Source
Folds
Use these to fold the elements of a Stream
.
>>>
S.fold_ (+) 0 id $ S.each [1..0]
50
The general folds fold
, fold_', foldM
and foldM_
are arranged
for use with Control.Foldl
purely
and impurely
>>>
L.purely fold_ L.sum $ each [1..10]
55>>>
L.purely fold_ (liftA3 (,,) L.sum L.product L.list) $ each [1..10]
(55,3628800,[1,2,3,4,5,6,7,8,9,10])
All functions marked with an underscore omit
(e.g. fold_
, sum_
) the stream's return value in a left-strict pair.
They are good for exiting streaming completely,
but when you are, e.g. mapped
-ing over a Stream (Stream (Of a) m) m r
,
which is to be compared with [[a]]
. Specializing, we have e.g.
mapped sum :: (Monad m, Num n) => Stream (Stream (Of Int)) IO () -> Stream (Of n) IO () mapped (fold mappend mempty id) :: Stream (Stream (Of Int)) IO () -> Stream (Of Int) IO ()
>>>
S.print $ mapped S.sum $ chunksOf 3 $ S.each [1..10]
6 15 24 10
>>>
let three_folds = L.purely S.fold (liftA3 (,,) L.sum L.product L.list)
>>>
S.print $ mapped three_folds $ chunksOf 3 (each [1..10])
(6,6,[1,2,3]) (15,120,[4,5,6]) (24,504,[7,8,9]) (10,10,[10])
fold :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> m (Of b r) Source
Strict fold of a Stream
of elements that preserves the return value.
The third parameter will often be id
where a fold is written by hand:
>>>
S.fold (+) 0 id $ each [1..10]
55 :> ()
>>>
S.fold (*) 1 id $ S.fold (+) 0 id $ S.copy $ each [1..10]
3628800 :> (55 :> ())
It can be used to replace a standard Haskell type with one more suited to
writing a strict accumulation function. It is also crucial to the
Applicative instance for Control.Foldl.Fold
We can apply such a fold
purely
Control.Foldl.purely S.fold :: Monad m => Fold a b -> Stream (Of a) m r -> m (Of b r)
Thus, specializing a bit:
L.purely S.fold L.sum :: Stream (Of Int) Int r -> m (Of Int r) mapped (L.purely S.fold L.sum) :: Stream (Stream (Of Int)) IO r -> Stream (Of Int) IO r
Here we use the Applicative instance for Control.Foldl.Fold
to
stream three-item segments of a stream together with their sums and products.
>>>
S.print $ mapped (L.purely S.fold (liftA3 (,,) L.list L.product L.sum)) $ chunksOf 3 $ each [1..10]
([1,2,3],6,6) ([4,5,6],120,15) ([7,8,9],504,24) ([10],10,10)
fold_ :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> m b Source
Strict fold of a Stream
of elements, preserving only the result of the fold, not
the return value of the stream. The third parameter will often be id
where a fold
is written by hand:
>>>
S.fold_ (+) 0 id $ each [1..10]
55
It can be used to replace a standard Haskell type with one more suited to
writing a strict accumulation function. It is also crucial to the
Applicative instance for Control.Foldl.Fold
Control.Foldl.purely fold :: Monad m => Fold a b -> Stream (Of a) m () -> m b
foldM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m (Of b r) Source
Strict, monadic fold of the elements of a 'Stream (Of a)'
Control.Foldl.impurely foldM' :: Monad m => FoldM a b -> Stream (Of a) m r -> m (b, r)
Thus to accumulate the elements of a stream as a vector, together with a random element we might write:
>>>
L.impurely S.foldM (liftA2 (,) L.vector L.random) $ each [1..10::Int] :: IO (Of (U.Vector Int,Maybe Int) ())
([1,2,3,4,5,6,7,8,9,10],Just 9) :> ()
foldM_ :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m b Source
Strict, monadic fold of the elements of a 'Stream (Of a)'
Control.Foldl.impurely foldM :: Monad m => FoldM a b -> Stream (Of a) m () -> m b
sum :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r) Source
Fold a Stream
of numbers into their sum with the return value
mapped S.sum :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m r
>>>
S.sum $ each [1..10]
55 :> ()
>>>
(n :> rest) <- S.sum $ S.splitAt 3 $ each [1..10]
>>>
print n
6>>>
(m :> rest') <- S.sum $ S.splitAt 3 rest
>>>
print m
15>>>
S.print rest'
7 8 9
product :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r) Source
Fold a Stream
of numbers into their product with the return value
maps' product' :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m r
product_ :: (Monad m, Num a) => Stream (Of a) m () -> m a Source
Fold a Stream
of numbers into their product
elem :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m (Of Bool r) Source
Exhaust a stream remembering only whether a
was an element.
notElem :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m (Of Bool r) Source
Exhaust a stream deciding whether a
was an element.
length :: Monad m => Stream (Of a) m r -> m (Of Int r) Source
Run a stream, keeping its length and its return value.
>>>
S.print $ mapped S.length $ chunksOf 3 $ S.each [1..10]
3 3 3 1
length_ :: Monad m => Stream (Of a) m r -> m Int Source
Run a stream, remembering only its length:
>>>
S.length $ S.each [1..10]
10
toList :: Monad m => Stream (Of a) m r -> m (Of [a] r) Source
Convert an effectful Stream
into a list alongside the return value
mapped toList :: Stream (Stream (Of a)) m r -> Stream (Of [a]) m
Like toList_
, it breaks streaming; unlike toList_
it preserves
the return value and thus is frequently useful with e.g. mapped
>>>
S.print $ mapped S.toList $ chunksOf 3 $ each [1..9]
[1,2,3] [4,5,6] [7,8,9]
toList_ :: Monad m => Stream (Of a) m () -> m [a] Source
Convert an effectful 'Stream (Of a)' into a list of as
Note: Needless to say, this function does not stream properly.
It is basically the same as Prelude mapM
which, like replicateM
,
sequence
and similar operations on traversable containers
is a leading cause of space leaks.
mconcat :: (Monad m, Monoid w) => Stream (Of w) m r -> m (Of w r) Source
Fold streamed items into their monoidal sum
>>>
S.mconcat $ S.take 2 $ S.map (Data.Monoid.Last . Just) (S.stdinLn)
first<Enter> last<Enter> Last {getLast = Just "last"} :> ()
foldrT :: (Monad m, MonadTrans t, Monad (t m)) => (a -> t m r -> t m r) -> Stream (Of a) m r -> t m r Source
A natural right fold for consuming a stream of elements.
See also the more general iterTM
in the Streaming
module
and the still more general destroy
foldrT (\a p -> Streaming.yield a >> p) = id foldrT (\a p -> Pipes.yield a >> p) :: Monad m => Stream (Of a) m r -> Producer a m r foldrT (\a p -> Conduit.yield a >> p) :: Monad m => Stream (Of a) m r -> Conduit a m r
Zips and unzips
zip :: Monad m => Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of (a, b)) m r Source
Zip two Streams
s
zipWith :: Monad m => (a -> b -> c) -> Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r Source
Zip two Streams
s using the provided combining function
zip3 :: Monad m => Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r -> Stream (Of (a, b, c)) m r Source
Zip three streams together
zipWith3 :: Monad m => (a -> b -> c -> d) -> Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r -> Stream (Of d) m r Source
Zip three Stream
s with a combining function
unzip :: Monad m => Stream (Of (a, b)) m r -> Stream (Of a) (Stream (Of b) m) r Source
The type
Data.List.unzip :: [(a,b)] -> ([a],[b])
might lead us to expect
Streaming.unzip :: Stream (Of (a,b)) m r -> Stream (Of a) m (Stream (Of b) m r)
which would not stream, since it would have to accumulate the second stream (of b
s).
Of course, Data.List
unzip
doesn't stream either.
This unzip
does
stream, though of course you can spoil this by using e.g. toList
:
>>>
let xs = map (\x-> (x,show x)) [1..5::Int]
>>>
S.toList $ S.toList $ S.unzip (S.each xs)
["1","2","3","4","5"] :> ([1,2,3,4,5] :> ())
>>>
Prelude.unzip xs
([1,2,3,4,5],["1","2","3","4","5"])
Note the difference of order in the results. It may be of some use to think why.
The first application of toList
was applied to a stream of integers:
>>>
:t S.unzip $ S.each xs
S.unzip $ S.each xs :: Monad m => Stream (Of Int) (Stream (Of String) m) ()
Like any fold, toList
takes no notice of the monad of effects.
toList :: Monad m => Stream (Of a) m r -> m (Of [a] r)
In the case at hand (since I am in ghci
) m = Stream (Of String) IO
.
So when I apply toList
, I exhaust that stream of integers, folding
it into a list:
>>>
:t S.toList $ S.unzip $ S.each xs
S.toList $ S.unzip $ S.each xs :: Monad m => Stream (Of String) m (Of [Int] ())
When I apply toList
to this, I reduce everything to an ordinary action in IO
,
and return a list of strings:
>>>
S.toList $ S.toList $ S.unzip (S.each xs)
["1","2","3","4","5"] :> ([1,2,3,4,5] :> ())
partitionEithers :: Monad m => Stream (Of (Either a b)) m r -> Stream (Of a) (Stream (Of b) m) r Source
Separate left and right values in distinct streams. (separate
is
a more powerful, functor-general, equivalent using Sum
in place of Either
).
So, for example, to permit unlimited user
input of Int
s on condition of only two errors, we might write:
>>>
S.toList $ S.print $ S.take 2 $ partitionEithers $ S.map readEither $ S.stdinLn :: IO (Of [Int] ())
1<Enter> 2<Enter> qqqqqqqqqq<Enter> "Prelude.read: no parse" 3<Enter> rrrrrrrrrr<Enter> "Prelude.read: no parse" [1,2,3] :> ()
partitionEithers = separate . maps S.eitherToSum lefts = hoist S.effects . partitionEithers rights = S.effects . partitionEithers rights = S.concat
partition :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source
filter p = hoist effects (partition p)
Maybes
These functions discard the Nothing
s that they encounter. They are analogous
to the functions from Data.Maybe
that share their names.
Pair manipulation
lazily :: Of a b -> (a, b) Source
Note that lazily
, strictly
, fst'
, and mapOf
are all so-called natural transformations on the primitive Of a
functor
If we write
type f ~~> g = forall x . f x -> g x
then we can restate some types as follows:
mapOf :: (a -> b) -> Of a ~~> Of b -- bifunctor lmap lazily :: Of a ~~> (,) a Identity . fst' :: Of a ~~> Identity a
Manipulation of a Stream f m r
by mapping often turns on recognizing natural transformations of f
.
Thus maps
is far more general the the map
of the Streaming.Prelude
, which can be
defined thus:
S.map :: (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r S.map f = maps (mapOf f)
i.e.
S.map f = maps (\(a :> x) -> (f a :> x))
This rests on recognizing that mapOf
is a natural transformation; note though
that it results in such a transformation as well:
S.map :: (a -> b) -> Stream (Of a) m ~> Stream (Of b) m
Thus we can maps
it in turn
Interoperation
reread :: Monad m => (s -> m (Maybe a)) -> s -> Stream (Of a) m () Source
Read an IORef (Maybe a)
or a similar device until it reads Nothing
.
reread
provides convenient exit from the io-streams
library
reread readIORef :: IORef (Maybe a) -> Stream (Of a) IO () reread Streams.read :: System.IO.Streams.InputStream a -> Stream (Of a) IO ()