churros- Channel/Arrow based streaming computation library.
Safe HaskellNone



Common transport-agnostic functions for using Churro.

Variants with a trailing underscore - E.g. runWait_ specialised the Async action to be () if you don't care about accumulating results and only processing items as they pass through the pipeline.

Variants with a trailing prime - E.g. processRetry`. also change the generality of the types involved in some way.



The examples in this module require the following imports:

>>> :set -XBlockArguments
>>> import Control.Churro.Transport
>>> import Data.Time.Clock
>>> import System.Timeout (timeout)


runWait :: Transport t => Churro a t Void Void -> IO a Source #

Automatically wait for a churro to complete.

runWait_ :: Transport t => Churro () t Void Void -> IO () Source #

Version of runWait specialised to ().

runWaitList :: (Transport t, Monoid a) => Churro a t Void b -> IO [b] Source #

Read the output of a Churro into a list.

Warning: This will block until the Churro terminates, Accumulating items in memory. Only use when you expect a finite amount of output. Otherwise consider composing with a Sink and using runWait.

>>> runWaitListChan $ sourceList [0..4] >>> arr succ

runWaitList_ :: Transport t => Churro () t Void b -> IO [b] Source #

Version of runWaitList specialised to ().

run :: Transport t => Churro a t Void Void -> IO (Async a) Source #

Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.

run_ :: Transport t => Churro () t Void Void -> IO (Async ()) Source #

Version of run with async return type specialised to ().

run' :: Transport t => Churro a t i o -> IO (Async a) Source #

Run any churro, there is no check that this was spawned with a source, or terminated with a sink. This is unsafe, since the pipeline may not generate or consume in a predictable way. Use run instead unless you are confident you know what you're doing.



sourceSingleton :: Transport t => o -> Churro () t Void o Source #

A single items source.

>>> runWaitChan $ sourceSingleton 13 >>> sinkPrint

Equivalent to pure from Applicative. Redefined here in case you're looking for a source!

>>> runWaitChan $ pure 23 >>> sinkPrint

sourceList :: (Transport t, Foldable f) => f o -> Churro () t Void o Source #

Create a source from a list of items, sending each down the churro independently.

>>> runWaitChan $ sourceList [4,2] >>> sinkPrint

sourceIO :: Transport t => ((o -> IO ()) -> IO a) -> Churro a t Void o Source #

Create a source from an IO action that is passed a function to yield new items.

>>> runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint

sourceIO_ :: Transport t => ((o -> IO ()) -> IO ()) -> Churro () t Void o Source #

Variant of sourceIO with Async action specialised to ().

sources :: (Transport t, Traversable f, Monoid a) => f (Churro a t Void o) -> Churro a t Void o Source #

Combine a list of sources into a single source.

Sends individual items downstream without attempting to combine them.

>>> runWaitChan $ sources [pure 1, pure 1] >>> sinkPrint

Can combine results of sources with a Monoid instance, although this isn't very useful when forming the start of a longer pipeline:

>>> :{
  r <- runWaitChan $ sources [sourceIO \_cb -> print 1 >> return "hello ", sourceIO \_cb -> print 1 >> return "world"]
  print r
"hello world"

sources_ :: Transport t => [Source () t o] -> Source () t o Source #

Variant of sources with Async action of sources in argument specialised to ().


sink :: (Transport t, Monoid a) => Churro a t a Void Source #

Consume all items and combines them into a result via their monoid.

>>> :set -XFlexibleContexts
>>> r <- runWaitChan $ pure' [1 :: Int] >>> sink
>>> print r

sink_ :: Transport t => Churro () t i Void Source #

Consume all items with no additional effects.

TODO: Decide if we should use some kind of nf evaluation here to force items.

>>> runWaitChan $ pure 1 >>> process print >>> sink_

sinkIO :: (Transport t, Monoid a) => (o -> IO a) -> Churro a t o Void Source #

Consume a churro with an IO process.

>>> runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))

sinkIO_ :: Transport t => (o -> IO ()) -> Churro () t o Void Source #

Variant of sinkIO with Async action specialised to ().

sinkHO :: Transport t => (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o Source #

Create a "sink" with more flexibility about when items are demanded using a higher-order HO callback.

This also allows a non-unit async action that can be recovered when run.

WARNING: You should use the provided callback if you want to acually create a sink.

TODO: Use hidden callback return type in order to ensure that the callback is called.

>>> import System.Timeout (timeout)
>>> :{
  r <- timeout 100000 $ runWaitChan $ sourceSingleton 1 >>>> sinkHO \ya -> do
    ya (print . show)
    return 25
  print r
Just 25

sinkPrint :: (Transport t, Show a) => Churro () t a Void Source #

Consume and print each item. Used in many examples, but not much use outside debugging!

>>> runWaitChan $ pure "hi" >>> sinkPrint


process :: Transport t => (a -> IO b) -> Churro () t a b Source #

Process each item with an IO action. Acts as a one-to-one process.

>>> runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint

processPrint :: (Transport t, Show b) => Churro () t b b Source #

Print each item then pass it on.

processDebug :: (Transport t, Show b) => String -> Churro () t b b Source #

Print each item with an additional debugging label.

processN :: Transport t => (i -> IO [o]) -> Churro () t i o Source #

Process each item with an IO action and potentially yield many items as a result. Acts as a one-to-many process.

>>> runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint

concatC :: Transport t => Churro () t [o] o Source #

Concatenates splits lists of items into individual items.

processes :: (Traversable f, Transport t1, Transport t2, Monoid a) => f (Churro a t1 i o) -> Churro a t2 i o Source #

Run a set of churros like a work-stealing queue for its inputs.

Similar to ArrowChoice, but more straightforward due to unified output type and independent implementation.

  • NOTE: This makes no judgement about the ordering of outputs corresponding to the ordering of inputs.
  • NOTE: You will need to specialise the transport of the processes. This is deliberate as it allows you to use a bounded channel that ensures allocation to idle processes. Use the processesUnagi variant from Bounded to default to a buffer size of 1.

WARNING: This won't deterministically allocate work to idle workers unless a bounded channel is used.

  • TODO: Figure out cancellation strategy.
  • TODO: Consider a binary combinator and this as a folded application.
>>> import Control.Churro.Transport.Unagi.Bounded (processesUnagi)

Sanity check - All items entering should propagate, independent of the number of processes:

>>> runWaitListChan $ sourceList [1,1,1,1,1] >>> processesUnagi (replicate 3 (delay 0.1))

This example creates a source of 10 values, then creates a process of 10 workers that all wait 1/2 a second. If this works, then all ten values should be consumed and propagated in 1/2 a second by distributing the load over the set of 10 workers:

>>> :{
  timeout 10000000 $ runWaitListChan $ sourceList (replicate 10 1) >>> processesUnagi (replicate 1 $ delay 0.05)
Just [1,1,1,1,1,1,1,1,1,1]

We could use different strategies such as round-robin, etc. to default to a more balanced allocation, but this wouldn't be most efficient if each worker performed at different rates of consumption.

thief :: (Transport t, Monoid a) => Int -> Churro a t i o -> Churro a t i o Source #

Set up N worker churro processes to concurrently process the stream.

justs :: Transport t => Churro () t (Maybe a) a Source #

Extract xs from (Just x)s. Similar to catMaybes.

>>> runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint

lefts :: Transport t => Churro () t (Either a b) a Source #

Extract ls from (Left l)s.

>>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint

rights :: Transport t => Churro () t (Either a b) b Source #

Extract rs from (Right r)s.

>>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint

takeC :: (Transport t, Integral n) => n -> Churro () t a a Source #

Take and yield the first n items.

WARNING: This is intended to terminate upstream once the items have been consumed downstream, but there is a bug preventing this from working at present!

>>> runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint

This implementation explicitly stops propagating when the Churro completes, although this could be handled by downstream consumer composition terminating the producer and just using replicateM.

dropC :: (Transport t, Integral n) => n -> Churro () t a a Source #

Drop the first n items.

>>> runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint

filterC :: Transport t => (a -> Bool) -> Churro () t a a Source #

Filter items according to a predicate.

>>> runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint

mapN :: Transport t => (a -> [b]) -> Churro () t a b Source #

Run a pure function over items, producing multiple outputs.

>>> runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint

delay :: Transport t => NominalDiffTime -> Churro () t a a Source #

Delay items from being sent downstream.

Note: NominalDiffTime's Num instance interprets literals as seconds.

>>> let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
>>> runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
>>> runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck

delayMicro :: Transport t => Int -> Churro () t a a Source #

Delay items in microseconds. Works the same way as delay.

withPrevious :: Transport t => Churro () t a (a, a) Source #

Passes consecutive pairs of items downstream.

>>> runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint

processRetry :: Transport t => Natural -> (i -> IO o) -> Churro () t i o Source #

Requeue an item if it fails. Swallows exceptions and gives up after retries.

Note: Process will always try once so if retries = 1 then a failing process will execute twice.

The item is requeues on the input side of the churro, so if other items have been passed in they will appear first!

Catches all SomeExceptions. If you wish to narrow the execption type, consider using the processRetry' variant composed with rights.

Note: There is an edgecase with Chan transport where a queued retry may not execute if a source completes and finalises before the item is requeued. A different transport type may allow a modified retry function that requeues differently.

>>> :{
  prog = processRetry 1 flakeyThing
  flakeyThing x = do
    if x > 1
      then print "GT"  >> return x
      else print "LTE" >> error ("oops! " <> show x)
  runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint

processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro () t i (Either e o) Source #

Raw version of processRetry. -- Polymorphic over exception type and forwards errors.

processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro () t (n, a) (Either e b) Source #

Rawest version of processRetry. Expects the incoming items to contain number of retries.

Also polymorphic over exception type. And forwards errors.