churros-0.1.4.0: Channel/Arrow based streaming computation library.
Safe HaskellNone
LanguageHaskell2010

Control.Churro.Prelude

Description

Common transport-agnostic functions for using Churro.

Variants with a trailing underscore - E.g. runWait_ specialised the Async action to be () if you don't care about accumulating results and only processing items as they pass through the pipeline.

Variants with a trailing prime - E.g. processRetry`. also change the generality of the types involved in some way.

Synopsis

Documentation

The examples in this module require the following imports:

>>> :set -XBlockArguments
>>> import Control.Churro.Transport
>>> import Data.Time.Clock

Runners

runWait :: Transport t => Churro a t Void Void -> IO a Source #

Automatically wait for a churro to complete.

runWait_ :: Transport t => Churro () t Void Void -> IO () Source #

Version of runWait specialised to ().

runWaitList :: (Transport t, Monoid a) => Churro a t Void b -> IO [b] Source #

Read the output of a Churro into a list.

Warning: This will block until the Churro terminates, Accumulating items in memory. Only use when you expect a finite amount of output. Otherwise consider composing with a Sink and using runWait.

>>> runWaitListChan $ sourceList [0..4] >>> arr succ
[1,2,3,4,5]

runWaitList_ :: Transport t => Churro () t Void b -> IO [b] Source #

Version of runWaitList specialised to ().

run :: Transport t => Churro a t Void Void -> IO (Async a) Source #

Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.

run_ :: Transport t => Churro () t Void Void -> IO (Async ()) Source #

Version of run with async return type specialised to ().

run' :: Transport t => Churro a t i o -> IO (Async a) Source #

Run any churro, there is no check that this was spawned with a source, or terminated with a sink. This is unsafe, since the pipeline may not generate or consume in a predictable way. Use run instead unless you are confident you know what you're doing.

Library

Sources

sourceSingleton :: Transport t => o -> Churro () t Void o Source #

A single items source.

>>> runWaitChan $ sourceSingleton 13 >>> sinkPrint
13

Equivalent to pure from Applicative. Redefined here in case you're looking for a source!

>>> runWaitChan $ pure 23 >>> sinkPrint
23

sourceList :: (Transport t, Foldable f) => f o -> Churro () t Void o Source #

Create a source from a list of items, sending each down the churro independently.

>>> runWaitChan $ sourceList [4,2] >>> sinkPrint
4
2

sourceIO :: Transport t => ((o -> IO ()) -> IO a) -> Churro a t Void o Source #

Create a source from an IO action that is passed a function to yield new items.

>>> runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint
4
2

sourceIO_ :: Transport t => ((o -> IO ()) -> IO ()) -> Churro () t Void o Source #

Variant of sourceIO with Async action specialised to ().

sources :: (Transport t, Foldable f, Traversable f) => f (Churro a t Void i) -> Churro () t Void i Source #

Combine a list of sources into a single source.

Sends individual items downstream without attempting to combine them.

Warning: Passing an empty list is unspecified.

TODO: Use NonEmptyList instead of []

>>> runWaitChan $ sources_ [pure 1, pure 1] >>> sinkPrint
1
1

sources_ :: Transport t => [Source () t o] -> Source () t o Source #

Variant of sources with Async action of sources in argument specialised to ().

Sinks

sink :: (Transport t, Monoid a) => Churro a t a Void Source #

Consume all items and combines them into a result via their monoid.

>>> :set -XFlexibleContexts
>>> r <- runWaitChan $ pure' [1 :: Int] >>> sink
>>> print r
[1]

sink_ :: Transport t => Churro () t i Void Source #

Consume all items with no additional effects.

TODO: Decide if we should use some kind of nf evaluation here to force items.

>>> runWaitChan $ pure 1 >>> process print >>> sink_
1

sinkIO :: (Transport t, Monoid a) => (o -> IO a) -> Churro a t o Void Source #

Consume a churro with an IO process.

>>> runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))
"hello"
2

sinkIO_ :: Transport t => (o -> IO ()) -> Churro () t o Void Source #

Variant of sinkIO with Async action specialised to ().

sinkHO :: Transport t => (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o Source #

Create a "sink" with more flexibility about when items are demanded using a higher-order HO callback.

This also allows a non-unit async action that can be recovered when run.

WARNING: You should use the provided callback if you want to acually create a sink.

TODO: Use hidden callback return type in order to ensure that the callback is called.

>>> import System.Timeout (timeout)
>>> :{
do
  r <- timeout 100000 $ runWaitChan $ sourceSingleton 1 >>>> sinkHO \ya -> do
    ya (print . show)
    return 25
  print r
:}
"1"
Just 25

sinkPrint :: (Transport t, Show a) => Churro () t a Void Source #

Consume and print each item. Used in many examples, but not much use outside debugging!

>>> runWaitChan $ pure "hi" >>> sinkPrint
"hi"

Churros

process :: Transport t => (a -> IO b) -> Churro () t a b Source #

Process each item with an IO action. Acts as a one-to-one process.

>>> runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint
"hi"
"ih"

processPrint :: (Transport t, Show b) => Churro () t b b Source #

Print each item then pass it on.

processDebug :: (Transport t, Show b) => String -> Churro () t b b Source #

Print each item with an additional debugging label.

processN :: Transport t => (i -> IO [o]) -> Churro () t i o Source #

Process each item with an IO action and potentially yield many items as a result. Acts as a one-to-many process.

>>> runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint
"1"
1
2

justs :: Transport t => Churro () t (Maybe a) a Source #

Extract xs from (Just x)s. Similar to catMaybes.

>>> runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint
1
3

lefts :: Transport t => Churro () t (Either a b) a Source #

Extract ls from (Left l)s.

>>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint
1
3

rights :: Transport t => Churro () t (Either a b) b Source #

Extract rs from (Right r)s.

>>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint
2

takeC :: (Transport t, Integral n) => n -> Churro () t a a Source #

Take and yield the first n items.

WARNING: This is intended to terminate upstream once the items have been consumed downstream, but there is a bug preventing this from working at present!

>>> runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint
1
2

This implementation explicitly stops propagating when the Churro completes, although this could be handled by downstream consumer composition terminating the producer and just using replicateM.

dropC :: (Transport t, Integral n) => n -> Churro () t a a Source #

Drop the first n items.

>>> runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint
3
4

filterC :: Transport t => (a -> Bool) -> Churro () t a a Source #

Filter items according to a predicate.

>>> runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint
4
5

mapN :: Transport t => (a -> [b]) -> Churro () t a b Source #

Run a pure function over items, producing multiple outputs.

>>> runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint
9
90

delay :: Transport t => NominalDiffTime -> Churro () t a a Source #

Delay items from being sent downstream.

Note: NominalDiffTime's Num instance interprets literals as seconds.

>>> let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
>>> runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
False
>>> runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck
True

delayMicro :: Transport t => Int -> Churro () t a a Source #

Delay items in microseconds. Works the same way as delay.

withPrevious :: Transport t => Churro () t a (a, a) Source #

Passes consecutive pairs of items downstream.

>>> runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint
(1,2)
(2,3)

processRetry :: Transport t => Natural -> (i -> IO o) -> Churro () t i o Source #

Requeue an item if it fails. Swallows exceptions and gives up after retries.

Note: Process will always try once so if retries = 1 then a failing process will execute twice.

The item is requeues on the input side of the churro, so if other items have been passed in they will appear first!

Catches all SomeExceptions. If you wish to narrow the execption type, consider using the processRetry' variant composed with rights.

Note: There is an edgecase with Chan transport where a queued retry may not execute if a source completes and finalises before the item is requeued. A different transport type may allow a modified retry function that requeues differently.

>>> :{
let
  prog = processRetry 1 flakeyThing
  flakeyThing x = do
    if x > 1
      then print "GT"  >> return x
      else print "LTE" >> error ("oops! " <> show x)
in
  runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint
:}
"LTE"
"LTE"
"GT"
2

processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro () t i (Either e o) Source #

Raw version of processRetry. -- Polymorphic over exception type and forwards errors.

processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro () t (n, a) (Either e b) Source #

Rawest version of processRetry. Expects the incoming items to contain number of retries.

Also polymorphic over exception type. And forwards errors.