Generic stream manipulations



inputFoldM Source


:: (a -> b -> IO a)

fold function

-> a

initial seed

-> InputStream b

input stream

-> IO (InputStream b, IO a)

returns a new stream as well as an IO action to fetch the updated seed value.

A side-effecting fold over an InputStream, as a stream transformer.

The IO action returned by inputFoldM can be used to fetch the updated seed value. Example:

ghci> is <- Streams.fromList [1, 2, 3::Int]
ghci> (is', getSeed) <- Streams.inputFoldM (\x y -> return (x+y)) 0 is
ghci> Streams.toList is'
ghci> getSeed

outputFoldM Source


:: (a -> b -> IO a)

fold function

-> a

initial seed

-> OutputStream b

output stream

-> IO (OutputStream b, IO a)

returns a new stream as well as an IO action to fetch the updated seed value.

A side-effecting fold over an OutputStream, as a stream transformer.

The IO action returned by outputFoldM can be used to fetch the updated seed value. Example:

ghci> is <- Streams.fromList [1, 2, 3::Int]
ghci> (os, getList) <- Streams.listOutputStream
ghci> (os', getSeed) <- Streams.outputFoldM (\x y -> return (x+y)) 0 os
ghci> Streams.connect is os'
ghci> getList
ghci> getSeed

fold Source


:: (s -> a -> s)

fold function

-> s

initial seed

-> InputStream a

input stream

-> IO s 

A left fold over an input stream. The input stream is fully consumed. See foldl.


ghci> Streams.fromList [1..10] >>= Streams.fold (+) 0

foldM Source


:: (s -> a -> IO s)

fold function

-> s

initial seed

-> InputStream a

input stream

-> IO s 

A side-effecting left fold over an input stream. The input stream is fully consumed. See foldl.


ghci> Streams.fromList [1..10] >>= Streams.foldM (x y -> return (x + y)) 0

any :: (a -> Bool) -> InputStream a -> IO Bool Source

any predicate stream returns True if any element in stream matches the predicate.

any consumes as few elements as possible, ending consumption if an element satisfies the predicate.

ghci> is <- Streams.fromList [1, 2, 3]
ghci> Streams.any (> 0) is    -- Consumes one element
ghci> is
Just 2
ghci> Streams.any even is     -- Only 3 remains

all :: (a -> Bool) -> InputStream a -> IO Bool Source

all predicate stream returns True if every element in stream matches the predicate.

all consumes as few elements as possible, ending consumption if any element fails the predicate.

ghci> is <- Streams.fromList [1, 2, 3]
ghci> Streams.all (< 0) is    -- Consumes one element
ghci> is
Just 2
ghci> Streams.all odd is      -- Only 3 remains

maximum :: Ord a => InputStream a -> IO (Maybe a) Source

maximum stream returns the greatest element in stream or Nothing if the stream is empty.

maximum consumes the entire stream.

ghci> is <- Streams.fromList [1, 2, 3]
ghci> Streams.maximum is
ghci> is     -- The stream is now empty

minimum :: Ord a => InputStream a -> IO (Maybe a) Source

minimum stream returns the greatest element in stream

minimum consumes the entire stream.

ghci> is <- Streams.fromList [1, 2, 3]
ghci> Streams.minimum is
ghci> is    -- The stream is now empty


unfoldM :: (b -> IO (Maybe (a, b))) -> b -> IO (InputStream a) Source

unfoldM f seed builds an InputStream from successively applying f to the seed value, continuing if f produces Just and halting on Nothing.

ghci> is <- Streams.unfoldM (n -> return $ if n < 3 then Just (n, n + 1) else Nothing) 0
ghci> Streams.toList is


map :: (a -> b) -> InputStream a -> IO (InputStream b) Source

Maps a pure function over an InputStream.

map f s passes all output from s through the function f.

Satisfies the following laws: (g . f) === f >=> g id === Streams.makeInputStream .

mapM :: (a -> IO b) -> InputStream a -> IO (InputStream b) Source

Maps an impure function over an InputStream.

mapM f s passes all output from s through the IO action f.

Satisfies the following laws:

Streams.mapM (f >=> g) === Streams.mapM f >=> Streams.mapM g
Streams.mapM return === Streams.makeInputStream .

mapM_ :: (a -> IO b) -> InputStream a -> IO (InputStream a) Source

Maps a side effect over an InputStream.

mapM_ f s produces a new input stream that passes all output from s through the side-effecting IO action f.


ghci> Streams.fromList [1,2,3] >>=
      Streams.mapM_ (putStrLn . show . (*2)) >>=

mapMaybe :: (a -> Maybe b) -> InputStream a -> IO (InputStream b) Source

A version of map that discards elements

mapMaybe f s passes all output from s through the function f and discards elements for which f s evaluates to Nothing.


ghci> Streams.fromList [Just 1, None, Just 3] >>=
      Streams.mapMaybe id >>=


contramap :: (a -> b) -> OutputStream b -> IO (OutputStream a) Source

Contravariant counterpart to map.

contramap f s passes all input to s through the function f.

Satisfies the following laws:

Streams.contramap (g . f) === Streams.contramap g >=> Streams.contramap f
Streams.contramap id === return

contramapM :: (a -> IO b) -> OutputStream b -> IO (OutputStream a) Source

Contravariant counterpart to mapM.

contramapM f s passes all input to s through the IO action f

Satisfies the following laws:

Streams.contramapM (f >=> g) = Streams.contramapM g >=> Streams.contramapM f
Streams.contramapM return = return

contramapM_ :: (a -> IO b) -> OutputStream a -> IO (OutputStream a) Source

Equivalent to mapM_ for output.

contramapM f s passes all input to s through the side-effecting IO action f.

contramapMaybe :: (a -> Maybe b) -> OutputStream b -> IO (OutputStream a) Source

Contravariant counterpart to contramapMaybe.

contramap f s passes all input to s through the function f. Discards all the elements for which f returns Nothing.



filter :: (a -> Bool) -> InputStream a -> IO (InputStream a) Source

Drops chunks from an input stream if they fail to match a given filter predicate. See filter.

Items pushed back to the returned stream are propagated back upstream.


ghci> Streams.fromList ["the", "quick", "brown", "fox"] >>=
      Streams.filter (/= "brown") >>= Streams.toList

filterM :: (a -> IO Bool) -> InputStream a -> IO (InputStream a) Source

Drops chunks from an input stream if they fail to match a given filter predicate. See filter.

Items pushed back to the returned stream are propagated back upstream.


ghci> Streams.fromList ["the", "quick", "brown", "fox"] >>=
      Streams.filterM (return . (/= "brown")) >>= Streams.toList

filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a) Source

Filters output to be sent to the given OutputStream using a pure function. See filter.


ghci> import qualified Data.ByteString.Char8 as S
ghci> os1 <- Streams.stdout >>= Streams.'System.IO.Streams.unlines
ghci> os2 <- os1 >>= Streams.contramap (S.pack . show) >>= Streams.filterOutput even
ghci> Streams.write (Just 3) os2
ghci> Streams.write (Just 4) os2

filterOutputM :: (a -> IO Bool) -> OutputStream a -> IO (OutputStream a) Source

Filters output to be sent to the given OutputStream using a predicate function in IO. See filterM.


ghci> let check a = putStrLn a ("Allow " ++ show a ++ "?") >> readLn :: IO Bool
ghci> import qualified Data.ByteString.Char8 as S
ghci> os1 <- Streams.unlines Streams.stdout
ghci> os2 <- os1 >>= Streams.contramap (S.pack . show) >>= Streams.filterOutputM check
ghci> Streams.write (Just 3) os2
Allow 3?
ghci> Streams.write (Just 4) os2
Allow 4?

Takes and drops

give :: Int64 -> OutputStream a -> IO (OutputStream a) Source

Wraps an OutputStream, producing a new OutputStream that will pass at most n items on to the wrapped stream, subsequently ignoring the rest of the input.

take :: Int64 -> InputStream a -> IO (InputStream a) Source

Wraps an InputStream, producing a new InputStream that will produce at most n items, subsequently yielding end-of-stream forever.

Items pushed back to the returned InputStream will be propagated upstream, modifying the count of taken items accordingly.


ghci> is <- Streams.fromList [1..9::Int]
ghci> is' <- Streams.take 1 is
ghci> is'
Just 1
ghci> is'
ghci> Streams.peek is
Just 2
ghci> Streams.unRead 11 is'
ghci> Streams.peek is
Just 11
ghci> Streams.peek is'
Just 11
ghci> is'
Just 11
ghci> is'
ghci> is
Just 2
ghci> Streams.toList is

drop :: Int64 -> InputStream a -> IO (InputStream a) Source

Wraps an InputStream, producing a new InputStream that will drop the first n items produced by the wrapped stream. See drop.

Items pushed back to the returned InputStream will be propagated upstream, modifying the count of dropped items accordingly.

ignore :: Int64 -> OutputStream a -> IO (OutputStream a) Source

Wraps an OutputStream, producing a new OutputStream that will ignore the first n items received, subsequently passing the rest of the input on to the wrapped stream.

Zip and unzip

zip :: InputStream a -> InputStream b -> IO (InputStream (a, b)) Source

Combines two input streams. Continues yielding elements from both input streams until one of them finishes.

zipWith :: (a -> b -> c) -> InputStream a -> InputStream b -> IO (InputStream c) Source

Combines two input streams using the supplied function. Continues yielding elements from both input streams until one of them finishes.

zipWithM :: (a -> b -> IO c) -> InputStream a -> InputStream b -> IO (InputStream c) Source

Combines two input streams using the supplied monadic function. Continues yielding elements from both input streams until one of them finishes.

unzip :: forall a b. InputStream (a, b) -> IO (InputStream a, InputStream b) Source

Takes apart a stream of pairs, producing a pair of input streams. Reading from either of the produced streams will cause a pair of values to be pulled from the original stream if necessary. Note that reading n values from one of the returned streams will cause n values to be buffered at the other stream.

Access to the original stream is thread safe, i.e. guarded by a lock.


intersperse :: a -> OutputStream a -> IO (OutputStream a) Source

The function intersperse v s wraps the OutputStream s, creating a new output stream that writes its input to s interspersed with the provided value v. See intersperse.


ghci> import Control.Monad ((>=>))
ghci> is <- Streams.fromList ["nom", "nom", "nom"::ByteString]
ghci> Streams.outputToList (Streams.intersperse "burp!" >=> Streams.connect is)

skipToEof :: InputStream a -> IO () Source

Drives an InputStream to end-of-stream, discarding all of the yielded values.

ignoreEof :: OutputStream a -> IO (OutputStream a) Source

Wraps an OutputStream, ignoring any end-of-stream Nothing values written to the returned stream.


atEndOfInput :: IO b -> InputStream a -> IO (InputStream a) Source

Wraps an InputStream, running the specified action when the stream yields end-of-file.


atEndOfOutput :: IO b -> OutputStream a -> IO (OutputStream a) Source

Wraps an OutputStream, running the specified action when the stream receives end-of-file.
