conduit-1.2.10: Streaming data processing library.

Safe HaskellSafe
LanguageHaskell98

Data.Conduit

Contents

Description

If this is your first time with conduit, you should probably start with the tutorial: https://github.com/snoyberg/conduit#readme.

Synopsis

Core interface

Types

type Source m o = ConduitM () o m () Source #

Provides a stream of output values, without consuming any input or producing a final result.

Since 0.5.0

type Conduit i m o = ConduitM i o m () Source #

Consumes a stream of input values and produces a stream of output values, without producing a final result.

Since 0.5.0

type Sink i = ConduitM i Void Source #

Consumes a stream of input values and produces a final result, without producing any output.

type Sink i m r = ConduitM i Void m r

Since 0.5.0

data ConduitM i o m r Source #

Core datatype of the conduit package. This type represents a general component which can consume a stream of input values i, produce a stream of output values o, perform actions in the m monad, and produce a final result r. The type synonyms provided here are simply wrappers around this type.

Since 1.0.0

Instances

MonadRWS r w s m => MonadRWS r w s (ConduitM i o m) Source # 
MonadBase base m => MonadBase base (ConduitM i o m) Source # 

Methods

liftBase :: base α -> ConduitM i o m α #

MonadError e m => MonadError e (ConduitM i o m) Source # 

Methods

throwError :: e -> ConduitM i o m a #

catchError :: ConduitM i o m a -> (e -> ConduitM i o m a) -> ConduitM i o m a #

MonadReader r m => MonadReader r (ConduitM i o m) Source # 

Methods

ask :: ConduitM i o m r #

local :: (r -> r) -> ConduitM i o m a -> ConduitM i o m a #

reader :: (r -> a) -> ConduitM i o m a #

MonadState s m => MonadState s (ConduitM i o m) Source # 

Methods

get :: ConduitM i o m s #

put :: s -> ConduitM i o m () #

state :: (s -> (a, s)) -> ConduitM i o m a #

MonadWriter w m => MonadWriter w (ConduitM i o m) Source # 

Methods

writer :: (a, w) -> ConduitM i o m a #

tell :: w -> ConduitM i o m () #

listen :: ConduitM i o m a -> ConduitM i o m (a, w) #

pass :: ConduitM i o m (a, w -> w) -> ConduitM i o m a #

MFunctor (ConduitM i o) Source # 

Methods

hoist :: Monad m => (forall a. m a -> n a) -> ConduitM i o m b -> ConduitM i o n b #

MonadTrans (ConduitM i o) Source # 

Methods

lift :: Monad m => m a -> ConduitM i o m a #

Monad (ConduitM i o m) Source # 

Methods

(>>=) :: ConduitM i o m a -> (a -> ConduitM i o m b) -> ConduitM i o m b #

(>>) :: ConduitM i o m a -> ConduitM i o m b -> ConduitM i o m b #

return :: a -> ConduitM i o m a #

fail :: String -> ConduitM i o m a #

Functor (ConduitM i o m) Source # 

Methods

fmap :: (a -> b) -> ConduitM i o m a -> ConduitM i o m b #

(<$) :: a -> ConduitM i o m b -> ConduitM i o m a #

Applicative (ConduitM i o m) Source # 

Methods

pure :: a -> ConduitM i o m a #

(<*>) :: ConduitM i o m (a -> b) -> ConduitM i o m a -> ConduitM i o m b #

(*>) :: ConduitM i o m a -> ConduitM i o m b -> ConduitM i o m b #

(<*) :: ConduitM i o m a -> ConduitM i o m b -> ConduitM i o m a #

MonadIO m => MonadIO (ConduitM i o m) Source # 

Methods

liftIO :: IO a -> ConduitM i o m a #

MonadThrow m => MonadThrow (ConduitM i o m) Source # 

Methods

throwM :: Exception e => e -> ConduitM i o m a #

MonadCatch m => MonadCatch (ConduitM i o m) Source # 

Methods

catch :: Exception e => ConduitM i o m a -> (e -> ConduitM i o m a) -> ConduitM i o m a #

PrimMonad m => PrimMonad (ConduitM i o m) Source # 

Associated Types

type PrimState (ConduitM i o m :: * -> *) :: * #

Methods

primitive :: (State# (PrimState (ConduitM i o m)) -> (#VoidRep, PtrRepLifted, State# (PrimState (ConduitM i o m)), a#)) -> ConduitM i o m a #

MonadResource m => MonadResource (ConduitM i o m) Source # 

Methods

liftResourceT :: ResourceT IO a -> ConduitM i o m a #

Monad m => Monoid (ConduitM i o m ()) Source # 

Methods

mempty :: ConduitM i o m () #

mappend :: ConduitM i o m () -> ConduitM i o m () -> ConduitM i o m () #

mconcat :: [ConduitM i o m ()] -> ConduitM i o m () #

type PrimState (ConduitM i o m) Source # 
type PrimState (ConduitM i o m) = PrimState m

Connect/fuse operators

(.|) infixr 2 Source #

Arguments

:: Monad m 
=> ConduitM a b m ()

upstream

-> ConduitM b c m r

downstream

-> ConduitM a c m r 

Combine two Conduits together into a new Conduit (aka fuse).

Output from the upstream (left) conduit will be fed into the downstream (right) conduit. Processing will terminate when downstream (right) returns. Leftover data returned from the right Conduit will be discarded.

Since: 1.2.8

($$) :: Monad m => Source m a -> Sink a m b -> m b infixr 0 Source #

The connect operator, which pulls data from a source and pushes to a sink. If you would like to keep the Source open to be used for other operations, use the connect-and-resume operator $$+.

Since 0.4.0

($=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r infixl 1 Source #

A synonym for =$= for backwards compatibility.

Since 0.4.0

(=$) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r infixr 2 Source #

A synonym for =$= for backwards compatibility.

Since 0.4.0

(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r infixr 2 Source #

Fusion operator, combining two Conduits together into a new Conduit.

Both Conduits will be closed when the newly-created Conduit is closed.

Leftover data returned from the right Conduit will be discarded.

Since 0.4.0

connect :: Monad m => Source m a -> Sink a m b -> m b Source #

Named function synonym for $$.

Since 1.2.3

fuse :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r Source #

Named function synonym for =$=.

Since 1.2.3

Fuse with upstream results

fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2) Source #

Fuse two ConduitMs together, and provide the return value of both. Note that this will force the entire upstream ConduitM to be run to produce the result value, even if the downstream terminates early.

Since 1.1.5

fuseBothMaybe :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (Maybe r1, r2) Source #

Like fuseBoth, but does not force consumption of the Producer. In the case that the Producer terminates, the result value is provided as a Just value. If it does not terminate, then a Nothing value is returned.

One thing to note here is that "termination" here only occurs if the Producer actually yields a Nothing value. For example, with the Producer mapM_ yield [1..5], if five values are requested, the Producer has not yet terminated. Termination only occurs when the sixth value is awaited for and the Producer signals termination.

Since 1.2.4

fuseUpstream :: Monad m => ConduitM a b m r -> Conduit b m c -> ConduitM a c m r Source #

Same as fuseBoth, but ignore the return value from the downstream Conduit. Same caveats of forced consumption apply.

Since 1.1.5

Primitives

await :: Monad m => Consumer i m (Maybe i) Source #

Wait for a single input value from upstream. If no data is available, returns Nothing. Once await returns Nothing, subsequent calls will also return Nothing.

Since 0.5.0

yield Source #

Arguments

:: Monad m 
=> o

output value

-> ConduitM i o m () 

Send a value downstream to the next component to consume. If the downstream component terminates, this call will never return control. If you would like to register a cleanup function, please use yieldOr instead.

Since 0.5.0

yieldM :: Monad m => m o -> ConduitM i o m () Source #

Send a monadic value downstream for the next component to consume.

Since: 1.2.7

leftover :: i -> ConduitM i o m () Source #

Provide a single piece of leftover input to be consumed by the next component in the current monadic binding.

Note: it is highly encouraged to only return leftover values from input already consumed from upstream.

Since: 0.5.0

runConduit :: Monad m => ConduitM () Void m r -> m r Source #

Run a pipeline until processing completes.

Since 1.2.1

runConduitPure :: ConduitM () Void Identity r -> r Source #

Run a pure pipeline until processing completes, i.e. a pipeline with Identity as the base monad. This is equivalient to runIdentity . runConduit.

Since: 1.2.8

runConduitRes :: MonadBaseControl IO m => ConduitM () Void (ResourceT m) r -> m r Source #

Run a pipeline which acquires resources with ResourceT, and then run the ResourceT transformer. This is equivalent to runResourceT . runConduit.

Since: 1.2.8

Finalization

bracketP Source #

Arguments

:: MonadResource m 
=> IO a

computation to run first ("acquire resource")

-> (a -> IO ())

computation to run last ("release resource")

-> (a -> ConduitM i o m r)

computation to run in-between

-> ConduitM i o m r 

Bracket a conduit computation between allocation and release of a resource. Two guarantees are given about resource finalization:

  1. It will be prompt. The finalization will be run as early as possible.
  2. It is exception safe. Due to usage of resourcet, the finalization will be run in the event of any exceptions.

Since 0.5.0

addCleanup :: Monad m => (Bool -> m ()) -> ConduitM i o m r -> ConduitM i o m r Source #

Add some code to be run when the given component cleans up.

The supplied cleanup function will be given a True if the component ran to completion, or False if it terminated early due to a downstream component terminating.

Note that this function is not exception safe. For that, please use bracketP.

Since 0.4.1

yieldOr Source #

Arguments

:: Monad m 
=> o 
-> m ()

finalizer

-> ConduitM i o m () 

Similar to yield, but additionally takes a finalizer to be run if the downstream component terminates.

Since 0.5.0

Exception handling

catchC :: (MonadBaseControl IO m, Exception e) => ConduitM i o m r -> (e -> ConduitM i o m r) -> ConduitM i o m r Source #

Catch all exceptions thrown by the current component of the pipeline.

Note: this will not catch exceptions thrown by other components! For example, if an exception is thrown in a Source feeding to a Sink, and the Sink uses catchC, the exception will not be caught.

Due to this behavior (as well as lack of async exception safety), you should not try to implement combinators such as onException in terms of this primitive function.

Note also that the exception handling will not be applied to any finalizers generated by this conduit.

Since 1.0.11

handleC :: (MonadBaseControl IO m, Exception e) => (e -> ConduitM i o m r) -> ConduitM i o m r -> ConduitM i o m r Source #

The same as flip catchC.

Since 1.0.11

tryC :: (MonadBaseControl IO m, Exception e) => ConduitM i o m r -> ConduitM i o m (Either e r) Source #

A version of try for use within a pipeline. See the comments in catchC for more details.

Since 1.0.11

Generalized conduit types

type Producer m o = forall i. ConduitM i o m () Source #

A component which produces a stream of output values, regardless of the input stream. A Producer is a generalization of a Source, and can be used as either a Source or a Conduit.

Since 1.0.0

type Consumer i m r = forall o. ConduitM i o m r Source #

A component which consumes a stream of input values and produces a final result, regardless of the output stream. A Consumer is a generalization of a Sink, and can be used as either a Sink or a Conduit.

Since 1.0.0

toProducer :: Monad m => Source m a -> Producer m a Source #

Generalize a Source to a Producer.

Since 1.0.0

toConsumer :: Monad m => Sink a m b -> Consumer a m b Source #

Generalize a Sink to a Consumer.

Since 1.0.0

Utility functions

awaitForever :: Monad m => (i -> ConduitM i o m r) -> ConduitM i o m () Source #

Wait for input forever, calling the given inner component for each piece of new input.

This function is provided as a convenience for the common pattern of awaiting input, checking if it's Just and then looping.

Since 0.5.0

transPipe :: Monad m => (forall a. m a -> n a) -> ConduitM i o m r -> ConduitM i o n r Source #

Transform the monad that a ConduitM lives in.

Note that the monad transforming function will be run multiple times, resulting in unintuitive behavior in some cases. For a fuller treatment, please see:

https://github.com/snoyberg/conduit/wiki/Dealing-with-monad-transformers

This function is just a synonym for hoist.

Since 0.4.0

mapOutput :: Monad m => (o1 -> o2) -> ConduitM i o1 m r -> ConduitM i o2 m r Source #

Apply a function to all the output values of a ConduitM.

This mimics the behavior of fmap for a Source and Conduit in pre-0.4 days. It can also be simulated by fusing with the map conduit from Data.Conduit.List.

Since 0.4.1

mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitM i o1 m r -> ConduitM i o2 m r Source #

Same as mapOutput, but use a function that returns Maybe values.

Since 0.5.0

mapInput Source #

Arguments

:: Monad m 
=> (i1 -> i2)

map initial input to new input

-> (i2 -> Maybe i1)

map new leftovers to initial leftovers

-> ConduitM i2 o m r 
-> ConduitM i1 o m r 

Apply a function to all the input values of a ConduitM.

Since 0.5.0

mergeSource :: Monad m => Source m i -> Conduit a m (i, a) Source #

Merge a Source into a Conduit. The new conduit will stop processing once either source or upstream have been exhausted.

passthroughSink Source #

Arguments

:: Monad m 
=> Sink i m r 
-> (r -> m ())

finalizer

-> Conduit i m i 

Turn a Sink into a Conduit in the following way:

  • All input passed to the Sink is yielded downstream.
  • When the Sink finishes processing, the result is passed to the provided to the finalizer function.

Note that the Sink will stop receiving input as soon as the downstream it is connected to shuts down.

An example usage would be to write the result of a Sink to some mutable variable while allowing other processing to continue.

Since 1.1.0

sourceToList :: Monad m => Source m a -> m [a] Source #

Convert a Source into a list. The basic functionality can be explained as:

sourceToList src = src $$ Data.Conduit.List.consume

However, sourceToList is able to produce its results lazily, which cannot be done when running a conduit pipeline in general. Unlike the Data.Conduit.Lazy module (in conduit-extra), this function performs no unsafe I/O operations, and therefore can only be as lazily as the underlying monad.

Since 1.2.6

Connect-and-resume

data ResumableSource m o Source #

A Source which has been started, but has not yet completed.

This type contains both the current state of the Source, and the finalizer to be run to close it.

Since 0.5.0

Instances

MFunctor ResumableSource Source #

Since 1.0.13

Methods

hoist :: Monad m => (forall a. m a -> n a) -> ResumableSource m b -> ResumableSource n b #

newResumableSource :: Monad m => Source m o -> ResumableSource m o Source #

Turn a Source into a ResumableSource with no attached finalizer.

Since 1.1.4

($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b) infixr 0 Source #

The connect-and-resume operator. This does not close the Source, but instead returns it to be used again. This allows a Source to be used incrementally in a large program, without forcing the entire program to live in the Sink monad.

Mnemonic: connect + do more.

Since 0.5.0

($$++) :: Monad m => ResumableSource m a -> Sink a m b -> m (ResumableSource m a, b) infixr 0 Source #

Continue processing after usage of $$+.

Since 0.5.0

($$+-) :: Monad m => ResumableSource m a -> Sink a m b -> m b infixr 0 Source #

Complete processing of a ResumableSource. This will run the finalizer associated with the ResumableSource. In order to guarantee process resource finalization, you must use this operator after using $$+ and $$++.

Since 0.5.0

($=+) :: Monad m => ResumableSource m a -> Conduit a m b -> ResumableSource m b infixl 1 Source #

Left fusion for a resumable source.

Since 1.0.16

unwrapResumable :: MonadIO m => ResumableSource m o -> m (Source m o, m ()) Source #

Unwraps a ResumableSource into a Source and a finalizer.

A ResumableSource represents a Source which has already been run, and therefore has a finalizer registered. As a result, if we want to turn it into a regular Source, we need to ensure that the finalizer will be run appropriately. By appropriately, I mean:

  • If a new finalizer is registered, the old one should not be called.
  • If the old one is called, it should not be called again.

This function returns both a Source and a finalizer which ensures that the above two conditions hold. Once you call that finalizer, the Source is invalidated and cannot be used.

Since 0.5.2

closeResumableSource :: Monad m => ResumableSource m a -> m () Source #

Execute the finalizer associated with a ResumableSource, rendering the ResumableSource invalid for further use.

This is just a more explicit version of $$+- return ().

Since 1.1.3

For Conduits

data ResumableConduit i m o Source #

A generalization of ResumableSource. Allows to resume an arbitrary conduit, keeping its state and using it later (or finalizing it).

Since 1.0.17

newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o Source #

Turn a Conduit into a ResumableConduit with no attached finalizer.

Since 1.1.4

(=$$+) :: Monad m => Conduit a m b -> Sink b m r -> Sink a m (ResumableConduit a m b, r) infixr 0 Source #

The connect-and-resume operator. This does not close the Conduit, but instead returns it to be used again. This allows a Conduit to be used incrementally in a large program, without forcing the entire program to live in the Sink monad.

Leftover data returned from the Sink will be discarded.

Mnemonic: connect + do more.

Since 1.0.17

(=$$++) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m (ResumableConduit i m o, r) infixr 0 Source #

Continue processing after usage of =$$+. Connect a ResumableConduit to a sink and return the output of the sink together with a new ResumableConduit.

Since 1.0.17

(=$$+-) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m r infixr 0 Source #

Complete processing of a ResumableConduit. This will run the finalizer associated with the ResumableConduit. In order to guarantee process resource finalization, you must use this operator after using =$$+ and =$$++.

Since 1.0.17

unwrapResumableConduit :: MonadIO m => ResumableConduit i m o -> m (Conduit i m o, m ()) Source #

Unwraps a ResumableConduit into a Conduit and a finalizer.

Since unwrapResumable for more information.

Since 1.0.17

Fusion with leftovers

fuseLeftovers :: Monad m => ([b] -> [a]) -> ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r Source #

Similar to fuseReturnLeftovers, but use the provided function to convert downstream leftovers to upstream leftovers.

Since 1.0.17

fuseReturnLeftovers :: Monad m => ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m (r, [b]) Source #

Same as normal fusion (e.g. =$=), except instead of discarding leftovers from the downstream component, return them.

Since 1.0.17

Flushing

data Flush a Source #

Provide for a stream of data that can be flushed.

A number of Conduits (e.g., zlib compression) need the ability to flush the stream at some point. This provides a single wrapper datatype to be used in all such circumstances.

Since 0.3.0

Constructors

Chunk a 
Flush 

Instances

Functor Flush Source # 

Methods

fmap :: (a -> b) -> Flush a -> Flush b #

(<$) :: a -> Flush b -> Flush a #

Eq a => Eq (Flush a) Source # 

Methods

(==) :: Flush a -> Flush a -> Bool #

(/=) :: Flush a -> Flush a -> Bool #

Ord a => Ord (Flush a) Source # 

Methods

compare :: Flush a -> Flush a -> Ordering #

(<) :: Flush a -> Flush a -> Bool #

(<=) :: Flush a -> Flush a -> Bool #

(>) :: Flush a -> Flush a -> Bool #

(>=) :: Flush a -> Flush a -> Bool #

max :: Flush a -> Flush a -> Flush a #

min :: Flush a -> Flush a -> Flush a #

Show a => Show (Flush a) Source # 

Methods

showsPrec :: Int -> Flush a -> ShowS #

show :: Flush a -> String #

showList :: [Flush a] -> ShowS #

Newtype wrappers

ZipSource

newtype ZipSource m o Source #

A wrapper for defining an Applicative instance for Sources which allows to combine sources together, generalizing zipSources. A combined source will take input yielded from each of its Sources until any of them stop producing output.

Since 1.0.13

Constructors

ZipSource 

Fields

Instances

Monad m => Functor (ZipSource m) Source # 

Methods

fmap :: (a -> b) -> ZipSource m a -> ZipSource m b #

(<$) :: a -> ZipSource m b -> ZipSource m a #

Monad m => Applicative (ZipSource m) Source # 

Methods

pure :: a -> ZipSource m a #

(<*>) :: ZipSource m (a -> b) -> ZipSource m a -> ZipSource m b #

(*>) :: ZipSource m a -> ZipSource m b -> ZipSource m b #

(<*) :: ZipSource m a -> ZipSource m b -> ZipSource m a #

sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o) Source #

Coalesce all values yielded by all of the Sources.

Implemented on top of ZipSource, see that data type for more details.

Since 1.0.13

ZipSink

newtype ZipSink i m r Source #

A wrapper for defining an Applicative instance for Sinks which allows to combine sinks together, generalizing zipSinks. A combined sink distributes the input to all its participants and when all finish, produces the result. This allows to define functions like

sequenceSinks :: (Monad m)
          => [Sink i m r] -> Sink i m [r]
sequenceSinks = getZipSink . sequenceA . fmap ZipSink

Note that the standard Applicative instance for conduits works differently. It feeds one sink with input until it finishes, then switches to another, etc., and at the end combines their results.

This newtype is in fact a type constrained version of ZipConduit, and has the same behavior. It's presented as a separate type since (1) it historically predates ZipConduit, and (2) the type constraining can make your code clearer (and thereby make your error messages more easily understood).

Since 1.0.13

Constructors

ZipSink 

Fields

Instances

Monad m => Functor (ZipSink i m) Source # 

Methods

fmap :: (a -> b) -> ZipSink i m a -> ZipSink i m b #

(<$) :: a -> ZipSink i m b -> ZipSink i m a #

Monad m => Applicative (ZipSink i m) Source # 

Methods

pure :: a -> ZipSink i m a #

(<*>) :: ZipSink i m (a -> b) -> ZipSink i m a -> ZipSink i m b #

(*>) :: ZipSink i m a -> ZipSink i m b -> ZipSink i m b #

(<*) :: ZipSink i m a -> ZipSink i m b -> ZipSink i m a #

sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r) Source #

Send incoming values to all of the Sink providing, and ultimately coalesce together all return values.

Implemented on top of ZipSink, see that data type for more details.

Since 1.0.13

ZipConduit

newtype ZipConduit i o m r Source #

Provides an alternative Applicative instance for ConduitM. In this instance, every incoming value is provided to all ConduitMs, and output is coalesced together. Leftovers from individual ConduitMs will be used within that component, and then discarded at the end of their computation. Output and finalizers will both be handled in a left-biased manner.

As an example, take the following program:

main :: IO ()
main = do
    let src = mapM_ yield [1..3 :: Int]
        conduit1 = CL.map (+1)
        conduit2 = CL.concatMap (replicate 2)
        conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2
        sink = CL.mapM_ print
    src $$ conduit =$ sink

It will produce the output: 2, 1, 1, 3, 2, 2, 4, 3, 3

Since 1.0.17

Constructors

ZipConduit 

Fields

Instances

Functor (ZipConduit i o m) Source # 

Methods

fmap :: (a -> b) -> ZipConduit i o m a -> ZipConduit i o m b #

(<$) :: a -> ZipConduit i o m b -> ZipConduit i o m a #

Monad m => Applicative (ZipConduit i o m) Source # 

Methods

pure :: a -> ZipConduit i o m a #

(<*>) :: ZipConduit i o m (a -> b) -> ZipConduit i o m a -> ZipConduit i o m b #

(*>) :: ZipConduit i o m a -> ZipConduit i o m b -> ZipConduit i o m b #

(<*) :: ZipConduit i o m a -> ZipConduit i o m b -> ZipConduit i o m a #

sequenceConduits :: (Traversable f, Monad m) => f (ConduitM i o m r) -> ConduitM i o m (f r) Source #

Provide identical input to all of the Conduits and combine their outputs into a single stream.

Implemented on top of ZipConduit, see that data type for more details.

Since 1.0.17