churros-0.1.6.0: Channel/Arrow based streaming computation library.
Safe HaskellSafe-Inferred
LanguageHaskell2010

Control.Churro.Prelude

Description

Common transport-agnostic functions for using Churro.

Variants with a trailing underscore - E.g. runWait_ specialised the Async action to be () if you don't care about accumulating results and only processing items as they pass through the pipeline.

Variants with a trailing prime - E.g. processRetry`. also change the generality of the types involved in some way.

Synopsis

Documentation

The examples in this module require the following imports:

>>> :set -XBlockArguments
>>> import Control.Churro.Transport
>>> import Data.Time.Clock
>>> import System.Timeout (timeout)

Runners

runWait :: Transport t => Churro a t Void Void -> IO a Source #

Automatically wait for a churro to complete.

runWait_ :: Transport t => Churro () t Void Void -> IO () Source #

Version of runWait specialised to ().

runWaitList :: (Transport t, Monoid a) => Churro a t Void b -> IO [b] Source #

Read the output of a Churro into a list.

Warning: This will block until the Churro terminates, Accumulating items in memory. Only use when you expect a finite amount of output. Otherwise consider composing with a Sink and using runWait.

>>> runWaitListChan $ sourceList [0..4] >>> arr succ
[1,2,3,4,5]

runWaitList_ :: Transport t => Churro () t Void b -> IO [b] Source #

Version of runWaitList specialised to ().

run :: Transport t => Churro a t Void Void -> IO (Async a) Source #

Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.

run_ :: Transport t => Churro () t Void Void -> IO (Async ()) Source #

Version of run with async return type specialised to ().

run' :: Transport t => Churro a t i o -> IO (Async a) Source #

Run any churro, there is no check that this was spawned with a source, or terminated with a sink. This is unsafe, since the pipeline may not generate or consume in a predictable way. Use run instead unless you are confident you know what you're doing.

Library

Sources

sourceSingleton :: Transport t => o -> Churro () t Void o Source #

A single items source.

>>> runWaitChan $ sourceSingleton 13 >>> sinkPrint
13

Equivalent to pure from Applicative. Redefined here in case you're looking for a source!

>>> runWaitChan $ pure 23 >>> sinkPrint
23

sourceList :: (Transport t, Foldable f) => f o -> Churro () t Void o Source #

Create a source from a list of items, sending each down the churro independently.

>>> runWaitChan $ sourceList [4,2] >>> sinkPrint
4
2

sourceIO :: Transport t => ((o -> IO ()) -> IO a) -> Churro a t Void o Source #

Create a source from an IO action that is passed a function to yield new items.

>>> runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint
4
2

sourceIO_ :: Transport t => ((o -> IO ()) -> IO ()) -> Churro () t Void o Source #

Variant of sourceIO with Async action specialised to ().

sources :: (Transport t, Traversable f, Monoid a) => f (Churro a t Void o) -> Churro a t Void o Source #

Combine a list of sources into a single source.

Sends individual items downstream without attempting to combine them.

>>> runWaitChan $ sources [pure 1, pure 1] >>> sinkPrint
1
1

Can combine results of sources with a Monoid instance, although this isn't very useful when forming the start of a longer pipeline:

>>> :{
do
  r <- runWaitChan $ sources [sourceIO \_cb -> print 1 >> return "hello ", sourceIO \_cb -> print 1 >> return "world"]
  print r
:}
1
1
"hello world"

sources_ :: Transport t => [Source () t o] -> Source () t o Source #

Variant of sources with Async action of sources in argument specialised to ().

Sinks

sink :: (Transport t, Monoid a) => Churro a t a Void Source #

Consume all items and combines them into a result via their monoid.

>>> :set -XFlexibleContexts
>>> r <- runWaitChan $ pure' [1 :: Int] >>> sink
>>> print r
[1]

sink_ :: Transport t => Churro () t i Void Source #

Consume all items with no additional effects.

TODO: Decide if we should use some kind of nf evaluation here to force items.

>>> runWaitChan $ pure 1 >>> process print >>> sink_
1

sinkIO :: (Transport t, Monoid a) => (o -> IO a) -> Churro a t o Void Source #

Consume a churro with an IO process.

>>> runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))
"hello"
2

sinkIO_ :: Transport t => (o -> IO ()) -> Churro () t o Void Source #

Variant of sinkIO with Async action specialised to ().

sinkHO :: Transport t => (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o Source #

Create a "sink" with more flexibility about when items are demanded using a higher-order HO callback.

This also allows a non-unit async action that can be recovered when run.

WARNING: You should use the provided callback if you want to acually create a sink.

TODO: Use hidden callback return type in order to ensure that the callback is called.

>>> import System.Timeout (timeout)
>>> :{
do
  r <- timeout 100000 $ runWaitChan $ sourceSingleton 1 >>>> sinkHO \ya -> do
    ya (print . show)
    return 25
  print r
:}
"1"
Just 25

sinkPrint :: (Transport t, Show a) => Churro () t a Void Source #

Consume and print each item. Used in many examples, but not much use outside debugging!

>>> runWaitChan $ pure "hi" >>> sinkPrint
"hi"

Churros

process :: Transport t => (a -> IO b) -> Churro () t a b Source #

Process each item with an IO action. Acts as a one-to-one process.

>>> runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint
"hi"
"ih"

processPrint :: (Transport t, Show b) => Churro () t b b Source #

Print each item then pass it on.

processDebug :: (Transport t, Show b) => String -> Churro () t b b Source #

Print each item with an additional debugging label.

processN :: Transport t => (i -> IO [o]) -> Churro () t i o Source #

Process each item with an IO action and potentially yield many items as a result. Acts as a one-to-many process.

>>> runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint
"1"
1
2

concatC :: Transport t => Churro () t [o] o Source #

Concatenates splits lists of items into individual items.

processes :: (Traversable f, Transport t1, Transport t2, Monoid a) => f (Churro a t1 i o) -> Churro a t2 i o Source #

Run a set of churros like a work-stealing queue for its inputs.

Similar to ArrowChoice, but more straightforward due to unified output type and independent implementation.

  • NOTE: This makes no judgement about the ordering of outputs corresponding to the ordering of inputs.
  • NOTE: You will need to specialise the transport of the processes. This is deliberate as it allows you to use a bounded channel that ensures allocation to idle processes. Use the processesUnagi variant from Bounded to default to a buffer size of 1.

WARNING: This won't deterministically allocate work to idle workers unless a bounded channel is used.

  • TODO: Figure out cancellation strategy.
  • TODO: Consider a binary combinator and this as a folded application.
>>> import Control.Churro.Transport.Unagi.Bounded (processesUnagi)

Sanity check - All items entering should propagate, independent of the number of processes:

>>> runWaitListChan $ sourceList [1,1,1,1,1] >>> processesUnagi (replicate 3 (delay 0.1))
[1,1,1,1,1]

This example creates a source of 10 values, then creates a process of 10 workers that all wait 1/2 a second. If this works, then all ten values should be consumed and propagated in 1/2 a second by distributing the load over the set of 10 workers:

>>> :{
do
  timeout 10000000 $ runWaitListChan $ sourceList (replicate 10 1) >>> processesUnagi (replicate 1 $ delay 0.05)
:}
Just [1,1,1,1,1,1,1,1,1,1]

We could use different strategies such as round-robin, etc. to default to a more balanced allocation, but this wouldn't be most efficient if each worker performed at different rates of consumption.

thief :: (Transport t1, Transport t2, Monoid a) => Int -> Churro a t1 i o -> Churro a t2 i o Source #

Set up N worker churro processes to concurrently process the stream.

Consider using thiefUnagi unless you have a requirement for controlling the transport of the process group.

justs :: Transport t => Churro () t (Maybe a) a Source #

Extract xs from (Just x)s. Similar to catMaybes.

>>> runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint
1
3

lefts :: Transport t => Churro () t (Either a b) a Source #

Extract ls from (Left l)s.

>>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint
1
3

rights :: Transport t => Churro () t (Either a b) b Source #

Extract rs from (Right r)s.

>>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint
2

takeC :: (Transport t, Integral n) => n -> Churro () t a a Source #

Take and yield the first n items.

WARNING: This is intended to terminate upstream once the items have been consumed downstream, but there is a bug preventing this from working at present!

>>> runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint
1
2

This implementation explicitly stops propagating when the Churro completes, although this could be handled by downstream consumer composition terminating the producer and just using replicateM.

dropC :: (Transport t, Integral n) => n -> Churro () t a a Source #

Drop the first n items.

>>> runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint
3
4

filterC :: Transport t => (a -> Bool) -> Churro () t a a Source #

Filter items according to a predicate.

>>> runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint
4
5

mapN :: Transport t => (a -> [b]) -> Churro () t a b Source #

Run a pure function over items, producing multiple outputs.

>>> runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint
9
90

delay :: Transport t => NominalDiffTime -> Churro () t a a Source #

Delay items from being sent downstream.

Note: NominalDiffTime's Num instance interprets literals as seconds.

>>> let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
>>> runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
False
>>> runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck
True

delayMicro :: Transport t => Int -> Churro () t a a Source #

Delay items in microseconds. Works the same way as delay.

withPrevious :: Transport t => Churro () t a (a, a) Source #

Passes consecutive pairs of items downstream.

>>> runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint
(1,2)
(2,3)

processRetry :: Transport t => Natural -> (i -> IO o) -> Churro () t i o Source #

Requeue an item if it fails. Swallows exceptions and gives up after retries.

Note: Process will always try once so if retries = 1 then a failing process will execute twice.

The item is requeues on the input side of the churro, so if other items have been passed in they will appear first!

Catches all SomeExceptions. If you wish to narrow the execption type, consider using the processRetry' variant composed with rights.

Note: There is an edgecase with Chan transport where a queued retry may not execute if a source completes and finalises before the item is requeued. A different transport type may allow a modified retry function that requeues differently.

>>> :{
let
  prog = processRetry 1 flakeyThing
  flakeyThing x = do
    if x > 1
      then print "GT"  >> return x
      else print "LTE" >> error ("oops! " <> show x)
in
  runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint
:}
"LTE"
"LTE"
"GT"
2

processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro () t i (Either e o) Source #

Raw version of processRetry. -- Polymorphic over exception type and forwards errors.

processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro () t (n, a) (Either e b) Source #

Rawest version of processRetry. Expects the incoming items to contain number of retries.

Also polymorphic over exception type. And forwards errors.