conduino-0.2.3.0: Lightweight composable continuation-based stream processors
Copyright(c) Justin Le 2019
LicenseBSD3
Maintainerjustin@jle.im
Stabilityexperimental
Portabilitynon-portable
Safe HaskellSafe-Inferred
LanguageHaskell2010

Data.Conduino

Description

Base API for Pipe. See documentation for Pipe, .|, and runPipe for information on usage.

A "prelude" of useful pipes can be found in Data.Conduino.Combinators.

Why a stream processing library?

A stream processing library is a way to stream processors in a composable way: instead of defining your entire stream processing function as a single recursive loop with some global state, instead think about each "stage" of the process, and isolate each state to its own segment. Each component can contain its own isolated state:

>>> runPipePure $ sourceList [1..10]
      .| scan (+) 0
      .| sinkList
[1,3,6,10,15,21,28,36,45,55]

All of these components have internal "state":

  • sourceList keeps track of "which" item in the list to yield next
  • scan keeps track of the current running sum
  • sinkList keeps track of all items that have been seen so far, as a list

They all work together without knowing any other component's internal state, so you can write your total streaming function without concerning yourself, at each stage, with the entire part.

In addition, there are useful functions to "combine" stream processors:

  • zipSink combines sinks in an "and" sort of way: combine two sinks in parallel and finish when all finish.
  • altSink combines sinks in an "or" sort of way: combine two sinks in parallel and finish when any of them finish
  • zipSource combines sources in parallel and collate their outputs.

Stream processing libraries are also useful for streaming composition of monadic effects (like IO or State), as well.

Synopsis

Documentation

data Pipe i o u m a Source #

Similar to a conduit from the conduit package.

For a Pipe i o u m a, you have:

  • i: Type of input stream (the things you can await)
  • o: Type of output stream (the things you yield)
  • u: Type of the result of the upstream pipe (Outputted when upstream pipe terminates)
  • m: Underlying monad (the things you can lift)
  • a: Result type when pipe terminates (outputted when finished, with pure or return)

Some specializations:

  • If i is (), the pipe is a source --- it doesn't need anything to produce items. It will pump out items on its own, for pipes downstream to receive and process.
  • If o is Void, the pipe is a sink --- it will never yield anything downstream. It will consume items from things upstream, and produce a result (a) if and when it terminates.
  • If u is Void, then the pipe's upstream is limitless, and never terminates. This means that you can use awaitSurely instead of await, to get await a value that is guaranteed to come. You'll get an i instead of a Maybe i.
  • If a is Void, then the pipe never terminates --- it will keep on consuming and/or producing values forever. If this is a sink, it means that the sink will never terminate, and so runPipe will also never terminate. If it is a source, it means that if you chain something downstream with .|, that downstream pipe can use awaitSurely to guarantee something being passed down.

Applicative and Monadic sequencing of pipes chains by exhaustion.

do pipeX
   pipeY
   pipeZ

is a pipe itself, that behaves like pipeX until it terminates, then pipeY until it terminates, then pipeZ until it terminates. The Monad instance allows you to choose "which pipe to behave like next" based on the terminating result of a previous pipe.

do x <- pipeX
   pipeBasedOn x

Usually you would use it by chaining together pipes with .| and then running the result with runPipe.

runPipe $ someSource
       .| somePipe
       .| someOtherPipe
       .| someSink

See .| and runPipe for more information on usage.

For a "prelude" of commonly used Pipes, see Data.Conduino.Combinators.

Instances

Instances details
(MonadReader r m, MonadWriter w m, MonadState s m) => MonadRWS r w s (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

MonadError e m => MonadError e (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

throwError :: e -> Pipe i o u m a #

catchError :: Pipe i o u m a -> (e -> Pipe i o u m a) -> Pipe i o u m a #

MonadReader r m => MonadReader r (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

ask :: Pipe i o u m r #

local :: (r -> r) -> Pipe i o u m a -> Pipe i o u m a #

reader :: (r -> a) -> Pipe i o u m a #

MonadState s m => MonadState s (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

get :: Pipe i o u m s #

put :: s -> Pipe i o u m () #

state :: (s -> (a, s)) -> Pipe i o u m a #

MonadWriter w m => MonadWriter w (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

writer :: (a, w) -> Pipe i o u m a #

tell :: w -> Pipe i o u m () #

listen :: Pipe i o u m a -> Pipe i o u m (a, w) #

pass :: Pipe i o u m (a, w -> w) -> Pipe i o u m a #

MonadTrans (Pipe i o u) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

lift :: Monad m => m a -> Pipe i o u m a #

MonadFree (PipeF i o u) (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

wrap :: PipeF i o u (Pipe i o u m a) -> Pipe i o u m a #

MonadFail m => MonadFail (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

fail :: String -> Pipe i o u m a #

MonadIO m => MonadIO (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

liftIO :: IO a -> Pipe i o u m a #

Alternative m => Alternative (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

empty :: Pipe i o u m a #

(<|>) :: Pipe i o u m a -> Pipe i o u m a -> Pipe i o u m a #

some :: Pipe i o u m a -> Pipe i o u m [a] #

many :: Pipe i o u m a -> Pipe i o u m [a] #

Applicative (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

pure :: a -> Pipe i o u m a #

(<*>) :: Pipe i o u m (a -> b) -> Pipe i o u m a -> Pipe i o u m b #

liftA2 :: (a -> b -> c) -> Pipe i o u m a -> Pipe i o u m b -> Pipe i o u m c #

(*>) :: Pipe i o u m a -> Pipe i o u m b -> Pipe i o u m b #

(<*) :: Pipe i o u m a -> Pipe i o u m b -> Pipe i o u m a #

Functor (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

fmap :: (a -> b) -> Pipe i o u m a -> Pipe i o u m b #

(<$) :: a -> Pipe i o u m b -> Pipe i o u m a #

Monad (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

(>>=) :: Pipe i o u m a -> (a -> Pipe i o u m b) -> Pipe i o u m b #

(>>) :: Pipe i o u m a -> Pipe i o u m b -> Pipe i o u m b #

return :: a -> Pipe i o u m a #

MonadPlus m => MonadPlus (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

mzero :: Pipe i o u m a #

mplus :: Pipe i o u m a -> Pipe i o u m a -> Pipe i o u m a #

MonadCatch m => MonadCatch (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

catch :: Exception e => Pipe i o u m a -> (e -> Pipe i o u m a) -> Pipe i o u m a #

MonadThrow m => MonadThrow (Pipe i o u m) Source # 
Instance details

Defined in Data.Conduino.Internal

Methods

throwM :: Exception e => e -> Pipe i o u m a #

(.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r infixr 2 Source #

The main operator for chaining pipes together. pipe1 .| pipe2 will connect the output of pipe1 to the input of pipe2.

Running a pipe will draw from pipe2, and if pipe2 ever asks for input (with await or something similar), it will block until pipe1 outputs something (or signals termination).

The structure of a full pipeline usually looks like:

runPipe $ someSource
       .| somePipe
       .| someOtherPipe
       .| someSink

Where you route a source into a series of pipes, which eventually ends up at a sink. runPipe will then produce the result of that sink.

runPipe :: Monad m => Pipe () Void u m a -> m a Source #

Run a pipe that is both a source and a sink (an "effect") into the effect that it represents.

Usually you wouild construct this using something like:

runPipe $ someSource
       .| somePipe
       .| someOtherPipe
       .| someSink

runPipe will produce the result of that final sink.

Some common errors you might receive:

  • i is not (): If you give a pipe where the first parameter ("input") is not (), it means that your pipe is not a producer. Pre-compose it (using .|) with a producer of the type you need.

For example, if you have a myPipe :: Pipe Int o u m a, this is a pipe that is awaiting Ints from upstream. Pre-compose with a producer of Ints, like sourceList [1,2,3] .| myPipe, in order to be able to run it.

  • o is not Void: If you give a pipe where the second parameter ("output") is not Void, it means that your pipe is not a consumer. Post-compose it (using .|) with a consumer of the type you need.

For example, if you have myPipe :: Pipe i Int u m a, this is a pipe that is yielding Ints downstream that are going unhandled. Post-compose it a consumer of Ints, like myPipe .| foldl (+) 0, in order to be able to run it.

If you just want to ignore all downstream yields, post-compose with sinkNull.

runPipePure :: Pipe () Void Void Identity a -> a Source #

runPipe when the underlying monad is Identity, and so has no effects.

Primitives

awaitEither :: Pipe i o u m (Either u i) Source #

Await on upstream output. Will block until it receives an i (expected input type) or a u if the upstream pipe terminates.

await :: Pipe i o u m (Maybe i) Source #

Await input from upstream. Will block until upstream yields.

Will return Nothing if the upstream pipe finishes and terminates.

If the upstream pipe never terminates, then you can use awaitSurely to guarantee a result.

Will always return Just if u is Void.

awaitWith :: (i -> Pipe i o u m u) -> Pipe i o u m u Source #

await, but directly chaining a continuation if the await was succesful.

The await will always be succesful if u is Void.

This is a way of writing code in a way that is agnostic to how the upstream pipe terminates.

awaitSurely :: Pipe i o Void m i Source #

Await input from upstream where the upstream pipe is guaranteed to never terminate.

A common type error will occur if u (upstream pipe result type) is not Void -- it might be () or some non-Void type. This means that the upstream pipe terminates, so awaiting cannot be assured.

In that case, either change your upstream pipe to be one that never terminates (which is most likely not possible), or use await instead of awaitSurely.

awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u Source #

A useful utility function over repeated awaits. Will repeatedly await and then continue with the given pipe whenever the upstream pipe yields.

Can be used to implement many pipe combinators:

map f = awaitForever $ x -> yield (f x)

yield :: o -> Pipe i o u m () Source #

Send output downstream.

Since v0.2.3.0, is strict. See yieldLazy for the original behavior.

yieldLazy :: o -> Pipe i o u m () Source #

Send output downstream without forcing its argument.

Since: 0.2.3.0

Special chaining

(&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) infixr 2 Source #

Like .|, but get the result of both pipes on termination, instead of just the second. This means that p &| q will only terminate with a result when both p and q terminate. (Typically, p .| q would terminate as soon as q terminates.)

Since: 0.2.1.0

(|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v infixr 2 Source #

Like .|, but keep the result of the first pipe, instead of the second. This means that p |. q will only terminate with a result when both p and q terminate. (Typically, p .| q would terminate as soon as q terminates.)

Since: 0.2.1.0

fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) Source #

Useful prefix version of &|.

Since: 0.2.1.0

fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v Source #

Useful prefix version of |..

Since: 0.2.1.0

fuseBothMaybe :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (Maybe v, r) Source #

Like fuseBoth and &|, except does not wait for the upstream pipe to terminate. Return Nothing in the first field if the upstream pipe hasn't terminated, and Just if it has, with the terminating value.

Since: 0.2.1.0

Incremental running

squeezePipe :: Monad m => Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) a) Source #

Squeeze a pipe by extracting all output that can be extracted before any input is requested. Returns a Left if the pipe eventually does request input (as a continuation on the new input), or a Right if the pipe terminates with a value before ever asking for input.

Since: 0.2.1.0

squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a) Source #

Squeeze a pipe by extracting all output that can be extracted before any input is requested. Returns a Left if the pipe eventually does request input (as a continuation on the new input, or a terminating u value), or a Right if the pipe terminates with a value before ever asking for input.

Since: 0.2.1.0

feedPipe Source #

Arguments

:: Monad m 
=> [i]

input to feed in

-> Pipe i o u m a 
-> m ([o], Either (i -> Pipe i o u m a) ([i], a)) 

Repeatedly run squeezePipe by giving it items from an input list. Returns the outputs observed, and Left if the input list was exhausted with more input expected, or Right if the pipe terminated, with the leftover inputs and output result.

Since: 0.2.1.0

feedPipeEither Source #

Arguments

:: Monad m 
=> [i]

input to feed in

-> Pipe i o u m a 
-> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a)) 

Repeatedly run squeezePipeEither by giving it items from an input list. Returns the outputs observed, and Left if the input list was exhausted with more input expected (or a u terminating upstream value), or Right if the pipe terminated, with the leftover inputs and output result.

Since: 0.2.1.0

Pipe transformers

mapInput :: (i -> j) -> Pipe j o u m a -> Pipe i o u m a Source #

(Contravariantly) map over the expected input type.

mapOutput :: (p -> o) -> Pipe i p u m a -> Pipe i o u m a Source #

Map over the downstream output type.

If you want to map over the result type, use fmap.

mapUpRes :: (u -> v) -> Pipe i o v m a -> Pipe i o u m a Source #

(Contravariantly) map over the upstream result type.

trimapPipe :: (i -> j) -> (p -> o) -> (u -> v) -> Pipe j p v m a -> Pipe i o u m a Source #

Map over the input type, output type, and upstream result type.

If you want to map over the result type, use fmap.

passthrough :: Monad m => Pipe i o u m a -> Pipe i (Maybe i, o) u m a Source #

Passthrough and pair each output with the last input that triggered it. Nothing will occur initially if the pipe outputs anything without consuming any values, but after the first Just, should only output Justs forever.

Since: 0.2.3.0

hoistPipe :: (Monad m, Monad n) => (forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a Source #

Transform the underlying monad of a pipe.

Note that if you are trying to work with monad transformers, this is probably not what you want. See Data.Conduino.Lift for tools for working with underlying monad transformers.

feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a Source #

Loop a pipe into itself.

  • Will feed all output back to the input
  • Will only ask for input upstream if output is stalled.
  • Yields all outputted values downstream, effectively duplicating them.

Since: 0.2.1.0

feedbackPipeEither :: Monad m => Pipe (Either i o) o u m a -> Pipe i o u m a Source #

A version of feedbackPipe that distinguishes upstream input from downstream output fed back. Gets Left from upstream, and Right from its own output.

  • Will feed all output back to the input
  • Will only ask for input upstream if output is stalled.
  • Yields all outputted values downstream, effectively duplicating them.

Since: 0.2.2.0

Wrappers

newtype ZipSource m a Source #

A newtype wrapper over a source (Pipe () o Void) that gives it an alternative Applicative and Alternative instance, matching "ListT done right".

<*> will pair up each output that the sources produce: if you await a value from downstream, it will wait until both paired sources yield before passing them on together.

<|> will completely exhaust the first source before moving on to the next source.

ZipSource is effectively equivalent to "ListT done right", the true List Monad transformer. <|> is concatentation. You can use this type with lift to lift a yielding action and <|> to sequence yields to implement the pattern described in http://www.haskellforall.com/2014/11/how-to-build-library-agnostic-streaming.html, where you can write streaming producers in a polymorphic way, and have it run with pipes, conduit, etc.

The main difference is that its Applicative instance ("zipping") is different from the traditional Applicative instance for ListT ("all combinations"). Effectively this becomes like a "zipping" Applicative instance for ListT.

If you want a Monad (or MonadIO) instance, use ListT instead, and convert using toListT/fromListT or the PipeList pattern/constructor.

Constructors

ZipSource 

Fields

Instances

Instances details
MonadTrans ZipSource Source # 
Instance details

Defined in Data.Conduino

Methods

lift :: Monad m => m a -> ZipSource m a #

Monad m => Alternative (ZipSource m) Source # 
Instance details

Defined in Data.Conduino

Methods

empty :: ZipSource m a #

(<|>) :: ZipSource m a -> ZipSource m a -> ZipSource m a #

some :: ZipSource m a -> ZipSource m [a] #

many :: ZipSource m a -> ZipSource m [a] #

Monad m => Applicative (ZipSource m) Source # 
Instance details

Defined in Data.Conduino

Methods

pure :: a -> ZipSource m a #

(<*>) :: ZipSource m (a -> b) -> ZipSource m a -> ZipSource m b #

liftA2 :: (a -> b -> c) -> ZipSource m a -> ZipSource m b -> ZipSource m c #

(*>) :: ZipSource m a -> ZipSource m b -> ZipSource m b #

(<*) :: ZipSource m a -> ZipSource m b -> ZipSource m a #

Functor (ZipSource m) Source # 
Instance details

Defined in Data.Conduino

Methods

fmap :: (a -> b) -> ZipSource m a -> ZipSource m b #

(<$) :: a -> ZipSource m b -> ZipSource m a #

unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a)) Source #

ZipSource is effectively ListT returning a Maybe. As such, you can use unconsZipSource to "peel off" the first yielded item, if it exists, and return the "rest of the list".

zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m () Source #

Takes two sources and runs them in parallel, collating their outputs.

Since: 0.2.1.0

newtype ZipSink i u m a Source #

A newtype wrapper over a sink (Pipe i Void) that gives it an alternative Applicative and Alternative instance.

<*> will distribute input over both sinks, and output a final result once both sinks finish.

<|> will distribute input over both sinks, and output a final result as soon as one or the other finishes.

Constructors

ZipSink 

Fields

Instances

Instances details
MonadTrans (ZipSink i u) Source # 
Instance details

Defined in Data.Conduino

Methods

lift :: Monad m => m a -> ZipSink i u m a #

Monad m => Alternative (ZipSink i u m) Source #

<|> = distribute input to all, and return the first result that finishes

empty = never finish

Instance details

Defined in Data.Conduino

Methods

empty :: ZipSink i u m a #

(<|>) :: ZipSink i u m a -> ZipSink i u m a -> ZipSink i u m a #

some :: ZipSink i u m a -> ZipSink i u m [a] #

many :: ZipSink i u m a -> ZipSink i u m [a] #

Monad m => Applicative (ZipSink i u m) Source #

<*> = distribute input to all, and return result when they finish

pure = immediately finish

Instance details

Defined in Data.Conduino

Methods

pure :: a -> ZipSink i u m a #

(<*>) :: ZipSink i u m (a -> b) -> ZipSink i u m a -> ZipSink i u m b #

liftA2 :: (a -> b -> c) -> ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m c #

(*>) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m b #

(<*) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m a #

Functor (ZipSink i u m) Source # 
Instance details

Defined in Data.Conduino

Methods

fmap :: (a -> b) -> ZipSink i u m a -> ZipSink i u m b #

(<$) :: a -> ZipSink i u m b -> ZipSink i u m a #

zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b Source #

Distribute input to both sinks, and finishes with the final result once both finish.

Forms an identity with pure.

altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a Source #

Distribute input to both sinks, and finishes with the result of the one that finishes first.

Generators

toListT :: Applicative m => Pipe () o u m () -> ListT m (Maybe o) Source #

A source is essentially equivalent to ListT producing a Maybe result. This converts it to the ListT it encodes.

See ZipSource for a wrapper over Pipe that gives the right Functor and Alternative instances.

fromListT :: Monad m => ListT m (Maybe o) -> Pipe i o u m () Source #

A source is essentially ListT producing a Maybe result. This converts a ListT to the source it encodes.

See ZipSource for a wrapper over Pipe that gives the right Functor and Alternative instances.

pattern PipeList :: Monad m => ListT m (Maybe a) -> Pipe () a u m () Source #

A source is equivalent to a ListT producing a Maybe; this pattern synonym lets you treat it as such. It essentialyl wraps over toListT and fromListT.

withSource Source #

Arguments

:: Pipe () o u m () 
-> (Maybe (o, m r) -> m r)

handler (Nothing = done, Just (x, next) = yielded value and next action

-> m r 

A source can be "run" by providing a continuation to handle and sequence each of its outputs. Is ths inverse of genSource.

This essentially turns a pipe into a church-encoded ListT.

genSource :: (forall r. (Maybe (o, m r) -> m r) -> m r) -> Pipe i o u m () Source #

Given a "generator" of o in m, return a source that that generator encodes. Is the inverse of withSource.

The generator is essentially a church-encoded ListT.