streaming-0.1.4.1: an elementary streaming prelude and general stream type.

Safe HaskellNone
LanguageHaskell2010

Streaming.Prelude

Contents

Description

This module is very closely modeled on Pipes.Prelude, Pipes.Group and Pipes.Parse. It maybe said to give independent expression to the conception of Producer manipulation articulated in the latter two modules. Because we dispense with piping and conduiting, the distinction between all of these modules collapses. The leading type is chosen to permit an api that is as close as possible to that of Data.List and the Prelude.

Import qualified thus:

import Streaming
import qualified Streaming.Prelude as S

For the examples below, one sometimes needs

import Streaming.Prelude (each, yield, next, mapped, stdoutLn, stdinLn)
import Data.Function ((&)) 

Other libraries that come up in passing are

import qualified Control.Foldl as L -- cabal install foldl
import qualified Pipes as P
import qualified Pipes.Prelude as P
import qualified System.IO as IO

Here are some correspondences between the types employed here and elsewhere:

              streaming             |            pipes               |       conduit       |  io-streams
-------------------------------------------------------------------------------------------------------------------
Stream (Of a) m ()                  | Producer a m ()                | Source m a          | InputStream a
                                    | ListT m a                      | ConduitM () o m ()  | Generator r ()
-------------------------------------------------------------------------------------------------------------------
Stream (Of a) m r                   | Producer a m r                 | ConduitM () o m r   | Generator a r
-------------------------------------------------------------------------------------------------------------------
Stream (Of a) m (Stream (Of a) m r) | Producer a m (Producer a m r)  |                     
--------------------------------------------------------------------------------------------------------------------
Stream (Stream (Of a) m) r          | FreeT (Producer a m) m r       |
--------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
ByteString m ()                     | Producer ByteString m ()       | Source m ByteString  | InputStream ByteString
--------------------------------------------------------------------------------------------------------------------

Synopsis

Types

data Of a b Source

A left-strict pair; the base functor for streams of individual elements.

Constructors

!a :> b infixr 5 

Instances

Monoid a => Monad (Of a) Source 
Functor (Of a) Source 
Monoid a => Applicative (Of a) Source 
Foldable (Of a) Source 
Traversable (Of a) Source 
(Eq a, Eq b) => Eq (Of a b) Source 
(Data a, Data b) => Data (Of a b) Source 
(Ord a, Ord b) => Ord (Of a b) Source 
(Read a, Read b) => Read (Of a b) Source 
(Show a, Show b) => Show (Of a b) Source 
(Monoid a, Monoid b) => Monoid (Of a b) Source 

Introducing streams of elements

yield :: Monad m => a -> Stream (Of a) m () Source

A singleton stream

>>> stdoutLn $ yield "hello"
hello
>>> S.sum $ do {yield 1; yield 2}
3
>>> let prompt = putStrLn "Enter a number:"
>>> let number = lift (prompt >> readLn) >>= yield :: Stream (Of Int) IO ()
>>> S.toList $ do {number; number; number}
Enter a number:
1
Enter a number:
2
Enter a number:
3
[1,2,3] :> ()

each :: (Monad m, Foldable f) => f a -> Stream (Of a) m () Source

Stream the elements of a pure, foldable container.

>>> S.print $ each [1..3]
1
2
3
>>> S.print $ mapped S.toList $ chunksOf 3 $ S.replicateM 5 getLine
s<Enter>
t<Enter>
u<Enter>
["s","t","u"]
v<Enter>
w<Enter>
["v","w"]

each' :: (Monad m, Foldable f) => f a -> Stream (Of a) m () Source

unfoldr :: Monad m => (s -> m (Either r (a, s))) -> s -> Stream (Of a) m r Source

Build a Stream by unfolding steps starting from a seed.

The seed can of course be anything, but this is one natural way to consume a pipes Producer. Consider:

>>> S.stdoutLn $ S.take 2 $ S.unfoldr P.next P.stdinLn
hello<Enter>
hello
goodbye<Enter>
goodbye
>>> S.stdoutLn $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2)
hello<Enter>
hello
goodbye<Enter>
goodbye
>>> S.effects $ S.unfoldr P.next (P.stdinLn P.>-> P.take 2 P.>-> P.stdoutLn)
hello<Enter>
hello
goodbye<Enter>
goodbye

stdinLn :: MonadIO m => Stream (Of String) m () Source

View standard input as a 'Stream (Of String) m r'. stdoutLn, by contrast, renders a 'Stream (Of String) m r' to standard output. The names follow Pipes.Prelude

>>> stdoutLn stdinLn
hello<Enter>
hello
world<Enter>
world
^CInterrupted.
>>> stdoutLn $ S.map reverse stdinLn
hello<Enter>
olleh
world<Enter>
dlrow
^CInterrupted.

readLn :: (MonadIO m, Read a) => Stream (Of a) m () Source

Read values from stdin, ignoring failed parses

>>> S.sum_ $ S.take 2 S.readLn :: IO Int
10<Enter>
12<Enter>
22
>>> S.toList $ S.take 3 (S.readLn :: Stream (Of Int) IO ())
1<Enter>
2<Enter>
1@#$%^&*\<Enter>
3<Enter>
[1,2,3] :> ()

fromHandle :: MonadIO m => Handle -> Stream (Of String) m () Source

Read Strings from a Handle using hGetLine

Terminates on end of input

>>> IO.withFile "/usr/share/dict/words" IO.ReadMode $ S.stdoutLn . S.take 3 . S.drop 50000 .  S.fromHandle
deflagrator
deflate
deflation

readFile :: MonadResource m => FilePath -> Stream (Of String) m () Source

Read the lines of a file as Haskell Strings

>>> runResourceT $ S.writeFile "lines.txt" $ S.take 2 S.stdinLn
hello<Enter>
world<Enter>
>>> runResourceT $ S.print $ S.readFile "lines.txt"
"hello"
"world"

runResourceT, as it is used here, means something like closing_all_handles. It makes it possible to write convenient, fairly sensible versions of readFile, writeFile and appendFile. IO.withFile IO.ReadMode ... is more complicated but is generally to be preferred. Its use is explained here.

iterate :: Monad m => (a -> a) -> a -> Stream (Of a) m r Source

Iterate a pure function from a seed value, streaming the results forever

iterateM :: Monad m => (a -> m a) -> m a -> Stream (Of a) m r Source

Iterate a monadic function from a seed value, streaming the results forever

repeat :: Monad m => a -> Stream (Of a) m r Source

Repeat an element ad inf. .

>>> S.print $ S.take 3 $ S.repeat 1
1
1
1

repeatM :: Monad m => m a -> Stream (Of a) m r Source

Repeat a monadic action ad inf., streaming its results.

>>> S.toList $ S.take 2 $ repeatM getLine
one<Enter>
two<Enter>
["one","two"]

replicate :: Monad m => Int -> a -> Stream (Of a) m () Source

Repeat an element several times

untilRight :: Monad m => m (Either a r) -> Stream (Of a) m r Source

cycle :: (Monad m, Functor f) => Stream f m r -> Stream f m s Source

Cycle repeatedly through the layers of a stream, ad inf. This function is functor-general

cycle = forever
>>> rest <- S.print $ S.splitAt 3 $ S.cycle (yield 0 >> yield 1)
True
False
True
>>> S.print $ S.take 3 rest
False
True
False

replicateM :: Monad m => Int -> m a -> Stream (Of a) m () Source

Repeat an action several times, streaming the results.

>>> S.print $ S.replicateM 2 getCurrentTime
2015-08-18 00:57:36.124508 UTC
2015-08-18 00:57:36.124785 UTC

enumFrom :: (Monad m, Enum n) => n -> Stream (Of n) m r Source

An infinite stream of enumerable values, starting from a given value. It is the same as `S.iterate succ`. Because their return type is polymorphic, enumFrom and enumFromThen (and iterate are useful for example with zip and zipWith, which require the same return type in the zipped streams. With each [1..] the following bit of connect-and-resume would be impossible:

>>> rest <- S.print $ S.zip (S.enumFrom 'a') $ S.splitAt 3 $ S.enumFrom 1
('a',1)
('b',2)
('c',3)
>>> S.print $ S.take 3 rest
4
5
6

enumFromThen :: (Monad m, Enum a) => a -> a -> Stream (Of a) m r Source

An infinite sequence of enumerable values at a fixed distance, determined by the first and second values. See the discussion of enumFrom

>>> S.print $ S.take 3 $ S.enumFromThen 100 200
100
200
300

seconds :: Stream (Of Double) IO r Source

Streams the number of seconds from the beginning of action

Thus, to mark times of user input we might write something like:

>>> S.toList $ S.take 3 $ S.zip S.seconds S.stdinLn
a<Enter>
b<Enter>
c<Enter>
[(0.0,"a"),(1.088711,"b"),(3.7289649999999996,"c")] :> ()

To restrict user input to some number of seconds, we might write:

>>> S.toList $ S.map fst $ S.zip S.stdinLn $ S.takeWhile (< 3) S.seconds
one<Enter>
two<Enter>
three<Enter>
four<Enter>
five<Enter>
["one","two","three","four","five"] :> ()

This is of course does not interrupt an action that has already begun.

Consuming streams of elements

stdoutLn :: MonadIO m => Stream (Of String) m () -> m () Source

Write Strings to stdout using putStrLn; terminates on a broken output pipe (This operation is modelled on stdoutLn).

>>> S.stdoutLn $ S.take 3 $ S.each $ words "one two three four five"
one
two
three

stdoutLn' :: MonadIO m => Stream (Of String) m r -> m r Source

Write Strings to stdout using putStrLn

This does not handle a broken output pipe, but has a polymorphic return value, which makes this possible:

>>> rest <- S.stdoutLn' $ S.show $ S.splitAt 3 (each [1..5])
1
2
3
>>> S.print rest
4
5

mapM_ :: Monad m => (a -> m b) -> Stream (Of a) m r -> m r Source

Reduce a stream to its return value with a monadic action.

>>> S.mapM_ Prelude.print $ each [1..5]
1
2
3
4
5
>>> rest <- S.mapM_ Prelude.print $ S.splitAt 3 $ each [1..10]
1
2
3
>>> S.sum rest
49 :> ()

print :: (MonadIO m, Show a) => Stream (Of a) m r -> m r Source

Print the elements of a stream as they arise.

>>> S.print $ S.take 2 S.stdinLn
hello
"hello"
world
"world"
>>> 

toHandle :: MonadIO m => Handle -> Stream (Of String) m r -> m r Source

Write a succession of strings to a handle as separate lines.

>>> S.toHandle IO.stdout $ each $ words "one two three"
one
two
three

writeFile :: MonadResource m => FilePath -> Stream (Of String) m r -> m r Source

Write a series of strings as lines to a file. The handle is crudely managed with ResourceT:

>>> runResourceT $ S.writeFile "lines.txt" $ S.take 2 S.stdinLn
hello<Enter>
world<Enter>
>>> runResourceT $ S.print $ S.readFile "lines.txt"
"hello"
"world"

first :: Monad m => Stream (Of r) m r -> m r Source

Take either the first item in a stream or the return value, if it is empty. The typical mark of an infinite stream is a polymorphic return value; in that case, first is a sort of safeHead

To iterate an action returning a Maybe, until it succeeds.

effects :: Monad m => Stream (Of a) m r -> m r Source

Reduce a stream, performing its actions but ignoring its elements.

>>> rest <- S.effects $ S.splitAt 2 $ each [1..5]
>>> S.print rest
3
4
5
    'effects' should be understood together with 'copy' and is subject to the rules
S.effects . S.copy       = id
hoist S.effects . S.copy = id

The similar effects and copy operations in Data.ByteString.Streaming obey the same rules.

erase :: Monad m => Stream (Of a) m r -> Stream Identity m r Source

Remove the elements from a stream of values, retaining the structure of layers.

drained :: (Monad m, Monad (t m), Functor (t m), MonadTrans t) => t m (Stream (Of a) m r) -> t m r Source

Where a transformer returns a stream, run the effects of the stream, keeping the return value. This is usually used at the type

drained :: Monad m => Stream (Of a) m (Stream (Of b) m r) -> Stream (Of a) m r
drained = join . fmap (lift . effects)

Here, for example, we split a stream in two places and throw out the middle segment:

>>> rest <- S.print $ S.drained $ S.splitAt 2 $ S.splitAt 5 $ each [1..7]
1
2
>>> S.print rest
6
7

In particular, we can define versions of take and takeWhile which retrieve the return value of the rest of the stream - and which can thus be used with maps:

take' n = S.drained . S.splitAt n
takeWhile' thus = S.drained . S.span thus

Stream transformers

map :: Monad m => (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Standard map on the elements of a stream.

>>> S.stdoutLn $ S.map reverse $ each (words "alpha beta")
ahpla
ateb

mapM :: Monad m => (a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Replace each element of a stream with the result of a monadic action

>>> S.print $ S.mapM readIORef $ S.chain (\ior -> modifyIORef ior (*100)) $ S.mapM newIORef $ each [1..6]
100
200
300
400
500
600

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation. Compare hoist, which has a similar effect on the monadic parameter.

maps id = id
maps f . maps g = maps (f . g)

mapped :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source

Map layers of one functor to another with a transformation involving the base monad. This could be trivial, e.g.

let noteBeginning text x = putStrLn text >> return text

this puts the is completely functor-general

maps and mapped obey these rules:

maps id              = id
mapped return        = id
maps f . maps g      = maps (f . g)
mapped f . mapped g  = mapped (f <=< g)
maps f . mapped g    = mapped (liftM f . g)
mapped f . maps g    = mapped (f <=< liftM g)

maps is more fundamental than mapped, which is best understood as a convenience for effecting this frequent composition:

mapped phi = decompose . maps (Compose . phi)  

for :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> Stream f m x) -> Stream f m r Source

for replaces each element of a stream with an associated stream. Note that the associated stream may layer any functor.

with :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> f x) -> Stream f m r Source

Replace each element in a stream of individual Haskell values (a Stream (Of a) m r) with an associated functorial step.

for str f  = concats (with str f)  
with str f = for str (yields . f)
with str f = maps (\(a:>r) -> r <$ f a) str
with = flip subst
subst = flip with
>>> with (each [1..3]) (yield . show) & intercalates (yield "--") & S.stdoutLn
1
--
2
--
3

subst :: (Monad m, Functor f) => (a -> f x) -> Stream (Of a) m r -> Stream f m r Source

Replace each element in a stream of individual values with a functorial layer of any sort. subst = flip with and is more convenient in a sequence of compositions that transform a stream.

with = flip subst
for str f = concats $ subst f str
subst f = maps (\(a:>r) -> r <$ f a)
S.concat = concats . subst each

copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source

Duplicate the content of stream, so that it can be acted on twice in different ways, but without breaking streaming. Thus, with each [1,2] I might do:

>>> S.print $ each ["one","two"]
"one"
"two"
>>> S.stdoutLn $ each ["one","two"]
one
two

With copy, I can as well do:

>>> S.print $ S.stdoutLn $ S.copy $ each ["one","two"]
one
"one"
two
"two"

copy should be understood together with effects and is subject to the rules

S.effects . S.copy       = id
hoist S.effects . S.copy = id

The similar operations in Streaming obey the same rules.

Where the actions you are contemplating are each simple folds over the elements, or a selection of elements, then the coupling of the folds is often more straightforwardly effected with Foldl, e.g.

>>> L.purely S.fold (liftA2 (,) L.sum L.product) $ each [1..10]
(55,3628800) :> ()

rather than

>>> S.sum $ S.product . S.copy $ each [1..10]
55 :> (3628800 :> ())

A Control.Foldl fold can be altered to act on a selection of elements by using handles on an appropriate lens. Some such manipulations are simpler and more List-like, using copy:

>>> L.purely S.fold (liftA2 (,) (L.handles (filtered odd) L.sum) (L.handles (filtered even) L.product)) $ each [1..10]
(25,3840) :> ()

becomes

>>> S.sum $ S.filter odd $ S.product $ S.filter even $ S.copy $ each [1..10]
25 :> (3840 :> ())

or using store

>>> S.sum $ S.filter odd $ S.store (S.product . S.filter even) $ each [1..10]
25 :> (3840 :> ())

But anything that fold of a Stream (Of a) m r into e.g. an m (Of b r) that has a constraint on m that is carried over into Stream f m - e.g. Monad, MonadIO, MonadResource, etc. can be used on the stream. Thus, I can fold over different groupings of the original stream:

>>> (S.toList . mapped S.toList . chunksOf 5) $  (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ each [1..10]
[[1,2,3,4,5],[6,7,8,9,10]] :> ([[1,2,3],[4,5,6],[7,8,9],[10]] :> ())

The procedure can be iterated as one pleases, as one can see from this (otherwise unadvisable!) example:

>>> (S.toList . mapped S.toList . chunksOf 4) $ (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ (S.toList . mapped S.toList . chunksOf 2) $ S.copy $ each [1..12]
[[1,2,3,4],[5,6,7,8],[9,10,11,12]] :> ([[1,2,3],[4,5,6],[7,8,9],[10,11,12]] :> ([[1,2],[3,4],[5,6],[7,8],[9,10],[11,12]] :> ()))

copy' :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source

copy' is the same as copy but reverses the order of interleaved effects. The difference should not be observable at all for pure folds over the data.

store :: Monad m => (Stream (Of a) (Stream (Of a) m) r -> t) -> Stream (Of a) m r -> t Source

Store the result of any suitable fold over a stream, keeping the stream for further manipulation. store f = f . copy :

>>> S.print $ S.store S.product $ each [1..4]
1
2
3
4
24 :> ()
>>> S.print $ S.store S.sum $ S.store S.product $ each [1..4]
1
2
3
4
10 :> (24 :> ())

Here the sum (10) and the product (24) have been 'stored' for use when finally we have traversed the stream with print . Needless to say, a second pass is excluded conceptually, so the folds that you apply successively with store are performed simultaneously, and in constant memory -- as they would be if, say, you linked them together with Control.Fold:

>>> L.impurely S.foldM (liftA3 (\a b c -> (b,c)) (L.sink print) (L.generalize L.sum) (L.generalize L.product)) $ each [1..4]
1
2
3
4
(10,24) :> ()

Fusing folds after the fashion of Control.Foldl will generally be a bit faster than the corresponding succession of uses of store, but by constant factor that will be completely dwarfed when any IO is at issue.

But store copy is much/ more powerful, as you can see by reflecting on uses like this:

>>> S.sum $ S.store (S.sum . mapped S.product . chunksOf 2) $ S.store (S.product . mapped S.sum . chunksOf 2 )$ each [1..6]
21 :> (44 :> (231 :> ()))

It will be clear that this cannot be reproduced with any combination of lenses, Control.Fold folds, or the like. (See also the discussion of copy.)

It would conceivable be clearer to import a series of specializations of store. It is intended to be used at types like these:

storeM ::  (forall s m . Monad m => Stream (Of a) m s -> m (Of b s))
        -> (Monad n => Stream (Of a) n r -> Stream (Of a) n (Of b r))
storeM = store

storeMIO :: (forall s m . MonadIO m => Stream (Of a) m s -> m (Of b s))
         -> ( MonadIO n => Stream (Of a) n r -> Stream (Of a) n (Of b r)
storeMIO = store

It is clear from these types that we are just using the general instances:

instance (Functor f, Monad m )  => Monad (Stream f m)
instance (Functor f, MonadIO m) => MonadIO (Stream f m)

We thus can't be touching the elements of the stream, or the final return value. It it is the same with other constraints that Stream (Of a) inherits, like MonadResource. Thus I can filter and write to one file, but nub and write to another, or to a database or the like:

>>> runResourceT $ (S.writeFile "hello2.txt" . S.nub) $ store (S.writeFile "hello.txt" . S.filter (/= "world")) $ each ["hello", "world", "goodbye", "world"]
>>> :! cat hello.txt
hello
goodbye
>>> :! cat hello2.txt
hello
world
goodbye

chain :: Monad m => (a -> m ()) -> Stream (Of a) m r -> Stream (Of a) m r Source

Apply an action to all values, re-yielding each

>>> S.product $ S.chain Prelude.print $ S.each [1..5]
1
2
3
4
5
120 :> ()

sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m r Source

Like the sequence but streaming. The result type is a stream of a's, but is not accumulated; the effects of the elements of the original stream are interleaved in the resulting stream. Compare:

sequence :: Monad m =>       [m a]           -> m [a]
sequence :: Monad m => Stream (Of (m a)) m r -> Stream (Of a) m r

This obeys the rule

filter :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source

Skip elements of a stream that fail a predicate

filterM :: Monad m => (a -> m Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source

Skip elements of a stream that fail a monadic test

delay :: MonadIO m => Double -> Stream (Of a) m r -> Stream (Of a) m r Source

Interpolate a delay of n seconds between yields.

intersperse :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r Source

take :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m () Source

End a stream after n elements; the original return value is thus lost. splitAt preserves this information. Note that, like splitAt, this function is functor-general, so that, for example, you can take not just a number of items from a stream of elements, but a number of substreams and the like.

>>> S.toList $ S.take 3 $ each "with"
"wit" :> ()
>>> runResourceT $ S.stdoutLn $ S.take 3 $ S.readFile "stream.hs"
import Streaming  
import qualified Streaming.Prelude as S
import Streaming.Prelude (each, next, yield)

takeWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m () Source

End stream when an element fails a condition; the original return value is lost. By contrast span preserves this information, and is generally more desirable.

S.takeWhile thus = void . S.span thus

To preserve the information - but thus also force the rest of the stream to be developed - write

S.drained . S.span thus

as dropWhile thus is

S.effects . S.span thus

drop :: Monad m => Int -> Stream (Of a) m r -> Stream (Of a) m r Source

Ignore the first n elements of a stream, but carry out the actions

>>> S.toList $ S.drop 2 $  S.replicateM 5 getLine
a<Enter>
b<Enter>
c<Enter>
d<Enter>
e<Enter>
["c","d","e"] :> ()

Because it retains the final return value, drop n is a suitable argument for maps:

>>> S.toList $ concats $ maps (S.drop 4) $ chunksOf 5 $ each [1..20]
[5,10,15,20] :> ()

dropWhile :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r Source

Ignore elements of a stream until a test succeeds, retaining the rest.

>>> S.print $ S.dropWhile ((< 5) . length) S.stdinLn
one<Enter>
two<Enter>
three<Enter>
"three"
four<Enter>
"four"
^CInterrupted.

concat :: (Monad m, Foldable f) => Stream (Of (f a)) m r -> Stream (Of a) m r Source

Make a stream of traversable containers into a stream of their separate elements. This is just

concat str = for str each
>>> S.print $ S.concat (each ["xy","z"])
'x'
'y'
'z'

Note that it also has the effect of catMaybes, rights 'map snd' and such-like operations.

>>> S.print $ S.concat $ S.each [Just 1, Nothing, Just 2]
1
2
>>> S.print $  S.concat $ S.each [Right 1, Left "Error!", Right 2]
1
2
>>> S.print $ S.concat $ S.each [('A',1), ('B',2)]
1
2

scan :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Strict left scan, streaming, e.g. successive partial results.

>>> S.print $ S.scan (++) "" id $ each (words "a b c d")
""
"a"
"ab"
"abc"
"abcd"

scan is fitted for use with Control.Foldl, thus:

>>> S.print $ L.purely S.scan L.list $ each [3..5]
[]
[3]
[3,4]
[3,4,5]

scanM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> Stream (Of b) m r Source

Strict left scan, accepting a monadic function. It can be used with FoldMs from Control.Foldl using impurely. Here we yield a succession of vectors each recording

>>> let v =  L.impurely scanM L.vector $ each [1..4::Int] :: Stream (Of (U.Vector Int)) IO ()
>>> S.print v
fromList []
fromList [1]
fromList [1,2]
fromList [1,2,3]
fromList [1,2,3,4]

scanned :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> Stream (Of (a, b)) m r Source

read :: (Monad m, Read a) => Stream (Of String) m r -> Stream (Of a) m r Source

Make a stream of strings into a stream of parsed values, skipping bad cases

>>> S.sum_ $ S.read $ S.takeWhile (/= "total") S.stdinLn :: IO Int
1000<Enter>
2000<Enter>
total<Enter>
3000

show :: (Monad m, Show a) => Stream (Of a) m r -> Stream (Of String) m r Source

cons :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r Source

The natural cons for a Stream (Of a).

cons a stream = yield a >> stream

Useful for interoperation:

Data.Text.foldr S.cons (return ()) :: Text -> Stream (Of Char) m ()
Lazy.foldrChunks S.cons (return ()) :: Lazy.ByteString -> Stream (Of Strict.ByteString) m ()

and so on.

duplicate :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source

duplicate' :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source

Splitting and inspecting streams of elements

next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r)) Source

The standard way of inspecting the first item in a stream of elements, if the stream is still 'running'. The Right case contains a Haskell pair, where the more general inspect would return a left-strict pair. There is no reason to prefer inspect since, if the Right case is exposed, the first element in the pair will have been evaluated to whnf.

next :: Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
inspect :: Monad m => Stream (Of a) m r -> m (Either r (Of a (Stream (Of a) m r)))

Interoperate with pipes producers thus:

Pipes.unfoldr Stream.next :: Stream (Of a) m r -> Producer a m r
Stream.unfoldr Pipes.next :: Producer a m r -> Stream (Of a) m r 

Similarly:

IOStreams.unfoldM (liftM (either (const Nothing) Just) . next) :: Stream (Of a) IO b -> IO (InputStream a)
Conduit.unfoldM (liftM (either (const Nothing) Just) . next)   :: Stream (Of a) m r -> Source a m r

But see uncons, which is better fitted to these unfoldMs

uncons :: Monad m => Stream (Of a) m () -> m (Maybe (a, Stream (Of a) m ())) Source

Inspect the first item in a stream of elements, without a return value. uncons provides convenient exit into another streaming type:

IOStreams.unfoldM uncons :: Stream (Of a) IO b -> IO (InputStream a)
Conduit.unfoldM uncons   :: Stream (Of a) m r -> Conduit.Source m a

splitAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source

Split a succession of layers after some number, returning a streaming or -- effectful pair. This function is the same as the splitsAt exported by the -- Streaming module, but since this module is imported qualified, it can -- usurp a Prelude name. It specializes to:

 splitAt :: (Monad m, Functor f) => Int -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)

split :: (Eq a, Monad m) => a -> Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source

Split a stream of elements wherever a given element arises. The action is like that of words.

>>> S.stdoutLn $ mapped S.toList $ S.split ' ' $ each "hello world  "
hello
world

break :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source

Break a sequence upon meeting element falls under a predicate, keeping it and the rest of the stream as the return value.

>>> rest <- S.print $ S.break even $ each [1,1,2,3]
1
1
>>> S.print rest
2
3

breakWhen :: Monad m => (x -> a -> x) -> x -> (x -> b) -> (b -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source

Yield elements, using a fold to maintain state, until the accumulated value satifies the supplied predicate. The fold will then be short-circuited and the element that breaks it will be put after the break. This function is easiest to use with purely

>>> rest <- each [1..10] & L.purely S.breakWhen L.sum (>10) & S.print
1
2
3
4
>>> S.print rest
5
6
7
8
9
10

span :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r) Source

Stream elements until one fails the condition, return the rest.

group :: (Monad m, Eq a) => Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source

Group successive equal items together

>>> S.toList $ mapped S.toList $ S.group $ each "baaaaad"
["b","aaaaa","d"] :> ()
>>> S.toList $ concats $ maps (S.drained . S.splitAt 1) $ S.group $ each "baaaaaaad"
"bad" :> ()

groupBy :: Monad m => (a -> a -> Bool) -> Stream (Of a) m r -> Stream (Stream (Of a) m) m r Source

Group elements of a stream in accordance with the supplied comparison.

>>> S.print $ mapped S.toList $ S.groupBy (>=) $ each [1,2,3,1,2,3,4,3,2,4,5,6,7,6,5]
[1]
[2]
[3,1,2,3]
[4,3,2,4]
[5]
[6]
[7,6,5]

Sum and Compose manipulation

distinguish :: (a -> Bool) -> Of a r -> Sum (Of a) (Of a) r Source

switch :: Sum f g r -> Sum g f r Source

Swap the order of functors in a sum of functors.

>>> S.toListM' $ S.print $ separate $ maps S.switch $ maps (S.distinguish (=='a')) $ S.each "banana"
'a'
'a'
'a'
"bnn" :> ()
>>> S.toListM' $ S.print $ separate $ maps (S.distinguish (=='a')) $ S.each "banana"
'b'
'n'
'n'
"aaa" :> ()

separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r Source

Given a stream on a sum of functors, make it a stream on the left functor, with the streaming on the other functor as the governing monad. This is useful for acting on one or the other functor with a fold. It generalizes partitionEithers massively, but actually streams properly.

>>> let odd_even = S.maps (S.distinguish even) $ S.each [1..10::Int]
>>> :t separate odd_even
separate odd_even
  :: Monad m => Stream (Of Int) (Stream (Of Int) m) ()

Now, for example, it is convenient to fold on the left and right values separately:

>>> S.toList $ S.toList $ separate odd_even
[2,4,6,8,10] :> ([1,3,5,7,9] :> ())

Or we can write them to separate files or whatever:

>>> runResourceT $ S.writeFile "even.txt" . S.show $ S.writeFile "odd.txt" . S.show $ S.separate odd_even
>>> :! cat even.txt
2
4
6
8
10
>>> :! cat odd.txt
1
3
5
7
9

Of course, in the special case of Stream (Of a) m r, we can achieve the above effects more simply by using copy

>>> S.toList . S.filter even $ S.toList . S.filter odd $ S.copy $ each [1..10::Int]
[2,4,6,8,10] :> ([1,3,5,7,9] :> ())

But separate and unseparate are functor-general.

unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r Source

eitherToSum :: Of (Either a b) r -> Sum (Of a) (Of b) r Source

sumToEither :: Sum (Of a) (Of b) r -> Of (Either a b) r Source

sumToCompose :: Sum f f r -> Compose (Of Bool) f r Source

composeToSum :: Compose (Of Bool) f r -> Sum f f r Source

Folds

Use these to fold the elements of a Stream.

>>> S.fold_ (+) 0 id $ S.each [1..0]
50

The general folds fold, fold_', foldM and foldM_ are arranged for use with Control.Foldl purely and impurely

>>> L.purely fold_ L.sum $ each [1..10]
55
>>> L.purely fold_ (liftA3 (,,) L.sum L.product L.list) $ each [1..10]
(55,3628800,[1,2,3,4,5,6,7,8,9,10])

All functions marked with an underscore omit (e.g. fold_, sum_) the stream's return value in a left-strict pair. They are good for exiting streaming completely, but when you are, e.g. mapped-ing over a Stream (Stream (Of a) m) m r, which is to be compared with [[a]]. Specializing, we have e.g.

 mapped sum :: (Monad m, Num n) => Stream (Stream (Of Int)) IO () -> Stream (Of n) IO ()
 mapped (fold mappend mempty id) :: Stream (Stream (Of Int)) IO () -> Stream (Of Int) IO ()
>>> S.print $ mapped S.sum $ chunksOf 3 $ S.each [1..10]
6
15
24
10
>>> let three_folds = L.purely S.fold (liftA3 (,,) L.sum L.product L.list)
>>> S.print $ mapped three_folds $ chunksOf 3 (each [1..10])
(6,6,[1,2,3])
(15,120,[4,5,6])
(24,504,[7,8,9])
(10,10,[10])

fold :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> m (Of b r) Source

Strict fold of a Stream of elements that preserves the return value. The third parameter will often be id where a fold is written by hand:

>>> S.fold (+) 0 id $ each [1..10]
55 :> ()
>>> S.fold (*) 1 id $ S.fold (+) 0 id $ S.copy $ each [1..10]
3628800 :> (55 :> ())

It can be used to replace a standard Haskell type with one more suited to writing a strict accumulation function. It is also crucial to the Applicative instance for Control.Foldl.Fold We can apply such a fold purely

Control.Foldl.purely S.fold :: Monad m => Fold a b -> Stream (Of a) m r -> m (Of b r)

Thus, specializing a bit:

L.purely S.fold L.sum :: Stream (Of Int) Int r -> m (Of Int r)
mapped (L.purely S.fold L.sum) :: Stream (Stream (Of Int)) IO r -> Stream (Of Int) IO r

Here we use the Applicative instance for Control.Foldl.Fold to stream three-item segments of a stream together with their sums and products.

>>> S.print $ mapped (L.purely S.fold (liftA3 (,,) L.list L.product L.sum)) $ chunksOf 3 $ each [1..10]
([1,2,3],6,6)
([4,5,6],120,15)
([7,8,9],504,24)
([10],10,10)

fold_ :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Of a) m r -> m b Source

Strict fold of a Stream of elements, preserving only the result of the fold, not the return value of the stream. The third parameter will often be id where a fold is written by hand:

>>> S.fold_ (+) 0 id $ each [1..10]
55 

It can be used to replace a standard Haskell type with one more suited to writing a strict accumulation function. It is also crucial to the Applicative instance for Control.Foldl.Fold

Control.Foldl.purely fold :: Monad m => Fold a b -> Stream (Of a) m () -> m b

foldM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m (Of b r) Source

Strict, monadic fold of the elements of a 'Stream (Of a)'

Control.Foldl.impurely foldM' :: Monad m => FoldM a b -> Stream (Of a) m r -> m (b, r)

Thus to accumulate the elements of a stream as a vector, together with a random element we might write:

>>> L.impurely S.foldM (liftA2 (,) L.vector L.random) $ each [1..10::Int] :: IO (Of (U.Vector Int,Maybe Int) ())
([1,2,3,4,5,6,7,8,9,10],Just 9) :> ()

foldM_ :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Of a) m r -> m b Source

Strict, monadic fold of the elements of a 'Stream (Of a)'

Control.Foldl.impurely foldM :: Monad m => FoldM a b -> Stream (Of a) m () -> m b

all :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m (Of Bool r) Source

all_ :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m Bool Source

any :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m (Of Bool r) Source

any_ :: Monad m => (a -> Bool) -> Stream (Of a) m r -> m Bool Source

sum :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r) Source

Fold a Stream of numbers into their sum with the return value

 mapped S.sum :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m r
>>> S.sum $ each [1..10]
55 :> ()
>>> (n :> rest)  <- S.sum $ S.splitAt 3 $ each [1..10]
>>> print n
6
>>> (m :> rest') <- S.sum $ S.splitAt 3 rest
>>> print m
15
>>> S.print rest'
7
8
9

sum_ :: (Monad m, Num a) => Stream (Of a) m () -> m a Source

Fold a Stream of numbers into their sum

product :: (Monad m, Num a) => Stream (Of a) m r -> m (Of a r) Source

Fold a Stream of numbers into their product with the return value

 maps' product' :: Stream (Stream (Of Int)) m r -> Stream (Of Int) m r

product_ :: (Monad m, Num a) => Stream (Of a) m () -> m a Source

Fold a Stream of numbers into their product

head :: Monad m => Stream (Of a) m r -> m (Of (Maybe a) r) Source

head_ :: Monad m => Stream (Of a) m r -> m (Maybe a) Source

last :: Monad m => Stream (Of a) m r -> m (Of (Maybe a) r) Source

last_ :: Monad m => Stream (Of a) m r -> m (Maybe a) Source

elem :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m (Of Bool r) Source

Exhaust a stream remembering only whether a was an element.

elem_ :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m Bool Source

notElem :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m (Of Bool r) Source

Exhaust a stream deciding whether a was an element.

notElem_ :: (Monad m, Eq a) => a -> Stream (Of a) m r -> m Bool Source

length :: Monad m => Stream (Of a) m r -> m (Of Int r) Source

Run a stream, keeping its length and its return value.

>>> S.print $ mapped S.length $ chunksOf 3 $ S.each [1..10]
3
3
3
1

length_ :: Monad m => Stream (Of a) m r -> m Int Source

Run a stream, remembering only its length:

>>> S.length $ S.each [1..10]
10

toList :: Monad m => Stream (Of a) m r -> m (Of [a] r) Source

Convert an effectful Stream into a list alongside the return value

 mapped toList :: Stream (Stream (Of a)) m r -> Stream (Of [a]) m 

Like toList_, it breaks streaming; unlike toList_ it preserves the return value and thus is frequently useful with e.g. mapped

>>> S.print $ mapped S.toList $ chunksOf 3 $ each [1..9]
[1,2,3]
[4,5,6]
[7,8,9]

toList_ :: Monad m => Stream (Of a) m () -> m [a] Source

Convert an effectful 'Stream (Of a)' into a list of as

Note: Needless to say, this function does not stream properly. It is basically the same as Prelude mapM which, like replicateM, sequence and similar operations on traversable containers is a leading cause of space leaks.

mconcat :: (Monad m, Monoid w) => Stream (Of w) m r -> m (Of w r) Source

Fold streamed items into their monoidal sum

>>> S.mconcat $ S.take 2 $ S.map (Data.Monoid.Last . Just) (S.stdinLn)
first<Enter>
last<Enter>
Last {getLast = Just "last"} :> ()

mconcat_ :: (Monad m, Monoid w) => Stream (Of w) m r -> m w Source

minimum :: (Monad m, Ord a) => Stream (Of a) m r -> m (Of (Maybe a) r) Source

minimum_ :: (Monad m, Ord a) => Stream (Of a) m r -> m (Maybe a) Source

maximum :: (Monad m, Ord a) => Stream (Of a) m r -> m (Of (Maybe a) r) Source

maximum_ :: (Monad m, Ord a) => Stream (Of a) m r -> m (Maybe a) Source

foldrM :: Monad m => (a -> m r -> m r) -> Stream (Of a) m r -> m r Source

A natural right fold for consuming a stream of elements. See also the more general iterT in the Streaming module and the still more general destroy

foldrT :: (Monad m, MonadTrans t, Monad (t m)) => (a -> t m r -> t m r) -> Stream (Of a) m r -> t m r Source

A natural right fold for consuming a stream of elements. See also the more general iterTM in the Streaming module and the still more general destroy

foldrT (\a p -> Streaming.yield a >> p) = id
foldrT (\a p -> Pipes.yield a     >> p) :: Monad m => Stream (Of a) m r -> Producer a m r
foldrT (\a p -> Conduit.yield a   >> p) :: Monad m => Stream (Of a) m r -> Conduit a m r

Zips and unzips

zip :: Monad m => Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of (a, b)) m r Source

Zip two Streamss

zipWith :: Monad m => (a -> b -> c) -> Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r Source

Zip two Streamss using the provided combining function

zip3 :: Monad m => Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r -> Stream (Of (a, b, c)) m r Source

Zip three streams together

zipWith3 :: Monad m => (a -> b -> c -> d) -> Stream (Of a) m r -> Stream (Of b) m r -> Stream (Of c) m r -> Stream (Of d) m r Source

Zip three Streams with a combining function

unzip :: Monad m => Stream (Of (a, b)) m r -> Stream (Of a) (Stream (Of b) m) r Source

The type

Data.List.unzip     :: [(a,b)] -> ([a],[b])

might lead us to expect

Streaming.unzip :: Stream (Of (a,b)) m r -> Stream (Of a) m (Stream (Of b) m r)

which would not stream, since it would have to accumulate the second stream (of bs). Of course, Data.List unzip doesn't stream either.

This unzip does stream, though of course you can spoil this by using e.g. toList:

>>> let xs =  map (\x-> (x,show x)) [1..5::Int]
>>> S.toList $ S.toList $ S.unzip (S.each xs)
["1","2","3","4","5"] :> ([1,2,3,4,5] :> ())
>>> Prelude.unzip xs
([1,2,3,4,5],["1","2","3","4","5"])

Note the difference of order in the results. It may be of some use to think why. The first application of toList was applied to a stream of integers:

>>> :t S.unzip $ S.each xs
S.unzip $ S.each xs :: Monad m => Stream (Of Int) (Stream (Of String) m) ()

Like any fold, toList takes no notice of the monad of effects.

toList :: Monad m => Stream (Of a) m r -> m (Of [a] r)

In the case at hand (since I am in ghci) m = Stream (Of String) IO. So when I apply toList, I exhaust that stream of integers, folding it into a list:

>>> :t S.toList $ S.unzip $ S.each xs
S.toList $ S.unzip $ S.each xs
  :: Monad m => Stream (Of String) m (Of [Int] ())

When I apply toList to this, I reduce everything to an ordinary action in IO, and return a list of strings:

>>> S.toList $ S.toList $ S.unzip (S.each xs)
["1","2","3","4","5"] :> ([1,2,3,4,5] :> ())

partitionEithers :: Monad m => Stream (Of (Either a b)) m r -> Stream (Of a) (Stream (Of b) m) r Source

Separate left and right values in distinct streams. (separate is a more powerful, functor-general, equivalent using Sum in place of Either). So, for example, to permit unlimited user input of Ints on condition of only two errors, we might write:

>>> S.toList $ S.print $ S.take 2 $ partitionEithers $ S.map readEither $ S.stdinLn  :: IO (Of [Int] ())
1<Enter>
2<Enter>
qqqqqqqqqq<Enter>
"Prelude.read: no parse"
3<Enter>
rrrrrrrrrr<Enter>
"Prelude.read: no parse"
[1,2,3] :> ()
partitionEithers = separate . maps S.eitherToSum  
lefts  = hoist S.effects . partitionEithers
rights = S.effects . partitionEithers
rights = S.concat 

partition :: Monad m => (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r Source

filter p = hoist effects (partition p)

Maybes

These functions discard the Nothings that they encounter. They are analogous to the functions from Data.Maybe that share their names.

catMaybes :: Monad m => Stream (Of (Maybe a)) m r -> Stream (Of a) m r Source

The catMaybes function takes a Stream of Maybes and returns a Stream of all of the Just values.

mapMaybe :: Monad m => (a -> Maybe b) -> Stream (Of a) m r -> Stream (Of b) m r Source

The mapMaybe function is a version of map which can throw out elements. In particular, the functional argument returns something of type Maybe b. If this is Nothing, no element is added on to the result Stream. If it is Just b, then b is included in the result Stream.

Pair manipulation

lazily :: Of a b -> (a, b) Source

Note that lazily, strictly, fst', and mapOf are all so-called natural transformations on the primitive Of a functor If we write

 type f ~~> g = forall x . f x -> g x

then we can restate some types as follows:

 mapOf            :: (a -> b) -> Of a ~~> Of b   -- bifunctor lmap
 lazily           ::             Of a ~~> (,) a
 Identity . fst'  ::             Of a ~~> Identity a

Manipulation of a Stream f m r by mapping often turns on recognizing natural transformations of f. Thus maps is far more general the the map of the Streaming.Prelude, which can be defined thus:

 S.map :: (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
 S.map f = maps (mapOf f)

i.e.

 S.map f = maps (\(a :> x) -> (f a :> x))

This rests on recognizing that mapOf is a natural transformation; note though that it results in such a transformation as well:

 S.map :: (a -> b) -> Stream (Of a) m ~> Stream (Of b) m   

Thus we can maps it in turn

 

strictly :: (a, b) -> Of a b Source

fst' :: Of a b -> a Source

snd' :: Of a b -> b Source

Interoperation

reread :: Monad m => (s -> m (Maybe a)) -> s -> Stream (Of a) m () Source

Read an IORef (Maybe a) or a similar device until it reads Nothing. reread provides convenient exit from the io-streams library

reread readIORef    :: IORef (Maybe a) -> Stream (Of a) IO ()
reread Streams.read :: System.IO.Streams.InputStream a -> Stream (Of a) IO ()

Basic Type

data Stream f m r Source

Instances

(MonadBase b m, Functor f) => MonadBase b (Stream f m) Source 
(Functor f, MonadError e m) => MonadError e (Stream f m) Source 
(Functor f, MonadReader r m) => MonadReader r (Stream f m) Source 
(Functor f, MonadState s m) => MonadState s (Stream f m) Source 
Functor f => MFunctor (Stream f) Source 
Functor f => MMonad (Stream f) Source 
Functor f => MonadTrans (Stream f) Source 
(Functor f, Monad m) => Monad (Stream f m) Source 
(Functor f, Monad m) => Functor (Stream f m) Source 
(Functor f, Monad m) => Applicative (Stream f m) Source 
(Applicative f, Monad m) => Alternative (Stream f m) Source

The Alternative instance glues streams together stepwise.

empty = never
(<|>) = zipsWith (liftA2 (,))

See also never, untilJust and delays

(Applicative f, Monad m) => MonadPlus (Stream f m) Source 
(MonadThrow m, Functor f) => MonadThrow (Stream f m) Source 
(MonadCatch m, Functor f) => MonadCatch (Stream f m) Source 
(MonadIO m, Functor f) => MonadIO (Stream f m) Source 
(MonadResource m, Functor f) => MonadResource (Stream f m) Source 
(Eq r, Eq (m (Stream f m r)), Eq (f (Stream f m r))) => Eq (Stream f m r) Source 
(Typeable (* -> *) f, Typeable (* -> *) m, Data r, Data (m (Stream f m r)), Data (f (Stream f m r))) => Data (Stream f m r) Source 
(Show r, Show (m (Stream f m r)), Show (f (Stream f m r))) => Show (Stream f m r) Source