Safe Haskell | None |
---|---|
Language | Haskell2010 |
Common transport-agnostic functions for using Churro.
Variants with a trailing underscore - E.g. runWait_
specialised the Async action to
be () if you don't care about accumulating results and only processing items as they
pass through the pipeline.
Variants with a trailing prime - E.g. processRetry
`. also change the generality of the
types involved in some way.
Synopsis
- runWait :: Transport t => Churro a t Void Void -> IO a
- runWait_ :: Transport t => Churro () t Void Void -> IO ()
- runWaitList :: (Transport t, Monoid a) => Churro a t Void b -> IO [b]
- runWaitList_ :: Transport t => Churro () t Void b -> IO [b]
- run :: Transport t => Churro a t Void Void -> IO (Async a)
- run_ :: Transport t => Churro () t Void Void -> IO (Async ())
- run' :: Transport t => Churro a t i o -> IO (Async a)
- sourceSingleton :: Transport t => o -> Churro () t Void o
- sourceList :: (Transport t, Foldable f) => f o -> Churro () t Void o
- sourceIO :: Transport t => ((o -> IO ()) -> IO a) -> Churro a t Void o
- sourceIO_ :: Transport t => ((o -> IO ()) -> IO ()) -> Churro () t Void o
- sources :: (Transport t, Traversable f, Monoid a) => f (Churro a t Void o) -> Churro a t Void o
- sources_ :: Transport t => [Source () t o] -> Source () t o
- sink :: (Transport t, Monoid a) => Churro a t a Void
- sink_ :: Transport t => Churro () t i Void
- sinkIO :: (Transport t, Monoid a) => (o -> IO a) -> Churro a t o Void
- sinkIO_ :: Transport t => (o -> IO ()) -> Churro () t o Void
- sinkHO :: Transport t => (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o
- sinkPrint :: (Transport t, Show a) => Churro () t a Void
- process :: Transport t => (a -> IO b) -> Churro () t a b
- processPrint :: (Transport t, Show b) => Churro () t b b
- processDebug :: (Transport t, Show b) => String -> Churro () t b b
- processN :: Transport t => (i -> IO [o]) -> Churro () t i o
- concatC :: Transport t => Churro () t [o] o
- processes :: (Traversable f, Transport t, Monoid a) => f (Churro a t i o) -> Churro a t i o
- thief :: (Transport t, Monoid a) => Int -> Churro a t i o -> Churro a t i o
- justs :: Transport t => Churro () t (Maybe a) a
- lefts :: Transport t => Churro () t (Either a b) a
- rights :: Transport t => Churro () t (Either a b) b
- takeC :: (Transport t, Integral n) => n -> Churro () t a a
- dropC :: (Transport t, Integral n) => n -> Churro () t a a
- filterC :: Transport t => (a -> Bool) -> Churro () t a a
- mapN :: Transport t => (a -> [b]) -> Churro () t a b
- delay :: Transport t => NominalDiffTime -> Churro () t a a
- delayMicro :: Transport t => Int -> Churro () t a a
- withPrevious :: Transport t => Churro () t a (a, a)
- processRetry :: Transport t => Natural -> (i -> IO o) -> Churro () t i o
- processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro () t i (Either e o)
- processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro () t (n, a) (Either e b)
Documentation
The examples in this module require the following imports:
>>>
:set -XBlockArguments
>>>
import Control.Churro.Transport
>>>
import Data.Time.Clock
>>>
import System.Timeout (timeout)
Runners
runWait :: Transport t => Churro a t Void Void -> IO a Source #
Automatically wait for a churro to complete.
runWait_ :: Transport t => Churro () t Void Void -> IO () Source #
Version of runWait
specialised to ()
.
runWaitList :: (Transport t, Monoid a) => Churro a t Void b -> IO [b] Source #
Read the output of a Churro into a list.
Warning: This will block until the Churro terminates,
Accumulating items in memory.
Only use when you expect a finite amount of output.
Otherwise consider composing with a Sink and using runWait
.
>>>
runWaitListChan $ sourceList [0..4] >>> arr succ
[1,2,3,4,5]
runWaitList_ :: Transport t => Churro () t Void b -> IO [b] Source #
Version of runWaitList
specialised to ()
.
run :: Transport t => Churro a t Void Void -> IO (Async a) Source #
Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.
run_ :: Transport t => Churro () t Void Void -> IO (Async ()) Source #
Version of run
with async return type specialised to ()
.
run' :: Transport t => Churro a t i o -> IO (Async a) Source #
Run any churro, there is no check that this was spawned with a source, or terminated with a sink.
This is unsafe, since the pipeline may not generate or consume in a predictable way.
Use run
instead unless you are confident you know what you're doing.
Library
Sources
sourceSingleton :: Transport t => o -> Churro () t Void o Source #
A single items source.
>>>
runWaitChan $ sourceSingleton 13 >>> sinkPrint
13
Equivalent to pure
from Applicative
. Redefined here in case you're looking for a source!
>>>
runWaitChan $ pure 23 >>> sinkPrint
23
sourceList :: (Transport t, Foldable f) => f o -> Churro () t Void o Source #
Create a source from a list of items, sending each down the churro independently.
>>>
runWaitChan $ sourceList [4,2] >>> sinkPrint
4 2
sourceIO :: Transport t => ((o -> IO ()) -> IO a) -> Churro a t Void o Source #
Create a source from an IO action that is passed a function to yield new items.
>>>
runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint
4 2
sourceIO_ :: Transport t => ((o -> IO ()) -> IO ()) -> Churro () t Void o Source #
Variant of sourceIO
with Async action specialised to ()
.
sources :: (Transport t, Traversable f, Monoid a) => f (Churro a t Void o) -> Churro a t Void o Source #
Combine a list of sources into a single source.
Sends individual items downstream without attempting to combine them.
>>>
runWaitChan $ sources [pure 1, pure 1] >>> sinkPrint
1 1
Can combine results of sources with a Monoid instance, although this isn't very useful when forming the start of a longer pipeline:
>>>
:{
do r <- runWaitChan $ sources [sourceIO \_cb -> print 1 >> return "hello ", sourceIO \_cb -> print 1 >> return "world"] print r :} 1 1 "hello world"
sources_ :: Transport t => [Source () t o] -> Source () t o Source #
Variant of sources
with Async action of sources in argument specialised to ()
.
Sinks
sink :: (Transport t, Monoid a) => Churro a t a Void Source #
Consume all items and combines them into a result via their monoid.
>>>
:set -XFlexibleContexts
>>>
r <- runWaitChan $ pure' [1 :: Int] >>> sink
>>>
print r
[1]
sink_ :: Transport t => Churro () t i Void Source #
Consume all items with no additional effects.
TODO: Decide if we should use some kind of nf
evaluation here to force items.
>>>
runWaitChan $ pure 1 >>> process print >>> sink_
1
sinkIO :: (Transport t, Monoid a) => (o -> IO a) -> Churro a t o Void Source #
Consume a churro with an IO process.
>>>
runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))
"hello" 2
sinkIO_ :: Transport t => (o -> IO ()) -> Churro () t o Void Source #
Variant of sinkIO
with Async action specialised to ()
.
sinkHO :: Transport t => (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o Source #
Create a "sink" with more flexibility about when items are demanded using a higher-order HO callback.
This also allows a non-unit async action that can be recovered when run.
WARNING: You should use the provided callback if you want to acually create a sink.
TODO: Use hidden callback return type in order to ensure that the callback is called.
>>>
import System.Timeout (timeout)
>>>
:{
do r <- timeout 100000 $ runWaitChan $ sourceSingleton 1 >>>> sinkHO \ya -> do ya (print . show) return 25 print r :} "1" Just 25
sinkPrint :: (Transport t, Show a) => Churro () t a Void Source #
Consume and print each item. Used in many examples, but not much use outside debugging!
>>>
runWaitChan $ pure "hi" >>> sinkPrint
"hi"
Churros
process :: Transport t => (a -> IO b) -> Churro () t a b Source #
Process each item with an IO action. Acts as a one-to-one process.
>>>
runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint
"hi" "ih"
processDebug :: (Transport t, Show b) => String -> Churro () t b b Source #
Print each item with an additional debugging label.
processN :: Transport t => (i -> IO [o]) -> Churro () t i o Source #
Process each item with an IO action and potentially yield many items as a result. Acts as a one-to-many process.
>>>
runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint
"1" 1 2
concatC :: Transport t => Churro () t [o] o Source #
Concatenates splits lists of items into individual items.
processes :: (Traversable f, Transport t, Monoid a) => f (Churro a t i o) -> Churro a t i o Source #
Run a set of churros like a work-stealing queue for its inputs.
Similar to ArrowChoice, but more straightforward due to unified output type and independent implementation.
Note: processes
makes no judgement about the ordering of outputs corresponding to the ordering of inputs.
TODO: Figure out cancellation strategy. TODO: Consider a binary combinator and this as a folded application.
WARNING: This won't deterministically allocate work to idle workers unless a bounded channel is used.
Sanity check - All items entering should propagate, independent of the number of processes:
>>>
runWaitListChan $ sourceList [1,1,1,1,1] >>> processes (replicate 3 (delay 0.1))
[1,1,1,1,1]
This example creates a source of 10 values, then creates a process of 10 workers that all wait 1/2 a second. If this works, then all ten values should be consumed and propagated in 1/2 a second by distributing the load over the set of 10 workers:
>>>
:{
do timeout 10000000 $ runWaitListChan $ sourceList (replicate 10 1) >>> processes (replicate 1 $ delay 0.05) :} Just [1,1,1,1,1,1,1,1,1,1]
We could use different strategies such as round-robin, etc. to default to a more balanced allocation, but this wouldn't be most efficient if each worker performed at different rates of consumption.
thief :: (Transport t, Monoid a) => Int -> Churro a t i o -> Churro a t i o Source #
Set up N worker churro processes to concurrently process the stream.
justs :: Transport t => Churro () t (Maybe a) a Source #
Extract xs from (Just x)s. Similar to catMaybes
.
>>>
runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint
1 3
lefts :: Transport t => Churro () t (Either a b) a Source #
Extract ls from (Left l)s.
>>>
runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint
1 3
rights :: Transport t => Churro () t (Either a b) b Source #
Extract rs from (Right r)s.
>>>
runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint
2
takeC :: (Transport t, Integral n) => n -> Churro () t a a Source #
Take and yield the first n items.
WARNING: This is intended to terminate upstream once the items have been consumed downstream, but there is a bug preventing this from working at present!
>>>
runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint
1 2
This implementation explicitly stops propagating when the Churro completes, although this could be handled by downstream consumer composition terminating the producer and just using replicateM.
dropC :: (Transport t, Integral n) => n -> Churro () t a a Source #
Drop the first n items.
>>>
runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint
3 4
filterC :: Transport t => (a -> Bool) -> Churro () t a a Source #
Filter items according to a predicate.
>>>
runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint
4 5
mapN :: Transport t => (a -> [b]) -> Churro () t a b Source #
Run a pure function over items, producing multiple outputs.
>>>
runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint
9 90
delay :: Transport t => NominalDiffTime -> Churro () t a a Source #
Delay items from being sent downstream.
Note: NominalDiffTime's Num instance interprets literals as seconds.
>>>
let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
>>>
runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
False
>>>
runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck
True
delayMicro :: Transport t => Int -> Churro () t a a Source #
Delay items in microseconds. Works the same way as delay
.
withPrevious :: Transport t => Churro () t a (a, a) Source #
Passes consecutive pairs of items downstream.
>>>
runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint
(1,2) (2,3)
processRetry :: Transport t => Natural -> (i -> IO o) -> Churro () t i o Source #
Requeue an item if it fails. Swallows exceptions and gives up after retries.
Note: Process will always try once so if retries = 1 then a failing process will execute twice.
The item is requeues on the input side of the churro, so if other items have been passed in they will appear first!
Catches all SomeException
s. If you wish to narrow the execption type, consider
using the processRetry' variant composed with rights
.
Note: There is an edgecase with Chan transport where a queued retry may not execute if a source completes and finalises before the item is requeued. A different transport type may allow a modified retry function that requeues differently.
>>>
:{
let prog = processRetry 1 flakeyThing flakeyThing x = do if x > 1 then print "GT" >> return x else print "LTE" >> error ("oops! " <> show x) in runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint :} "LTE" "LTE" "GT" 2
processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro () t i (Either e o) Source #
Raw version of processRetry
. -- Polymorphic over exception type and forwards errors.