Copyright | (c) 2014, 2015 Gatlin Johnson <gatlin@niltag.net> |
---|---|
License | GPL-3 |
Maintainer | gatlin@niltag.net |
Stability | experimental |
Safe Haskell | Safe |
Language | Haskell2010 |
Write effect-ful stream processing functions and compose them into a series of tubes.
This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.
My goals were to
- learn more about iteratees and stream processing; and
- explore the relationships between functions, pairs, sum types, and products.
- type Tube a b = FreeT (TubeF a b)
- newtype TubeF a b k = TubeF {
- runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r
- type Source b m r = forall x. Tube x b m r
- type Sink a m r = forall x. Tube a x m r
- run :: FreeT f m a -> m (FreeF f a (FreeT f m a))
- await :: Monad m => Tube a b m a
- yield :: Monad m => b -> Tube a b m ()
- each :: (Monad m, Foldable t) => t b -> Tube a b m ()
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- (|>) :: Monad m => Tube x b m r -> Sink (Maybe b) m s -> Sink (Maybe b) m s
- (-<) :: Monad m => a -> Sink a m b -> Sink a m b
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- cat :: Monad m => Tube a a m r
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- reduce :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Source a m () -> m b
- every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m ()
- unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ()))
- prompt :: MonadIO m => Source String m ()
- mapM :: Monad m => (a -> m b) -> Tube a b m r
- sequence :: Monad m => Tube (m a) a m r
- display :: MonadIO m => Sink String m ()
- type Pump a b = CofreeT (PumpF a b)
- data PumpF a b k = PumpF {}
- mkPump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a
- send :: Comonad w => b -> Pump a b w r -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- pump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r
- pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r
- meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x
- enumerator :: [a] -> Pump (Maybe a) a Identity [a]
- enumerate :: (Monad m, Comonad w) => Pump (Maybe a) b w r -> Tube c a m ()
- lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Documentation
A Tube
is a computation that can yield multiple intermediate values or await
intermediate inputs before computing a final result. Any monadic function may
be turned into a Tube
.
Tube
s may be composed in different ways. For instance, in ghci:
>>> run $ for (each [1..4] >< map show) $ lift . putStrLn 1 2 3 4
Here, each
converts a Foldable
into a Source
of values; for
performs a
computation with each value. Another example, using two built-in Tube
s for
convenience:
>>> run $ prompt >< filter (/= "Die Antwoord") >< map (++ " is bad") >< print > dubstep dubstep is bad > the sun the sun is bad > Die Antwoord > this example this example is bad
A few stream processing combinators are provided for mapping, filtering, taking, and other basic operations.
For those times when you want to reduce
a stream, you can like so:
>>> reduce (+) 0 id (each [1..10]) 55
><
is useful for combining Tube
s which all have the same return value -
most often ()
simply because every Source
will have that value.
There is more in the library not covered here, and you are encouraged to take a look around.
type Tube a b = FreeT (TubeF a b) Source
A Tube
is a computation which can
yield
an intermediate value downstream and suspend execution; andawait
a value from upstream, deferring execution until it is received.
Moreover, individual Tube
s may be freely composed into larger ones, so long
as their types match. Thus, one may write small, reusable building blocks and
construct efficient stream process pipelines.
Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.
TubeF
is the union of unary functions and binary products into a single
type, here defined with a Boehm-Berarducci encoding.
This type is equivalent to the following:
data TubeF a b k = Await (a -> k) -- :: (a -> k) -> TubeF a b k | Yield (b , k) -- :: (b , k) -> TubeF a b k
The type signatures for the two value constructors should bear a strong
resemblance to the actual type signature of runT
. Instead of encoding
tubes as structures which build up when composed, a TubeF
is a control
flow mechanism which picks one of two provided continuations.
People using this library should never have to contend with these details but it is worth mentioning.
Core infrastructure
(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source
Connect a task to a continuation yielding another task; see ><
(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source
Compose two tubes into a new tube.
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
:: Monad m | |
=> (x -> a -> x) | step function |
-> x | initial value |
-> (x -> b) | final transformation |
-> Source a m () | stream source |
-> m b |
Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.
every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source
Similar to each
except it explicitly marks the stream as exhausted
unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source
Taps the next value from a source, maybe.
prompt :: MonadIO m => Source String m () Source
Source of String
s from stdin. This is mostly for debugging / ghci example purposes.
mapM :: Monad m => (a -> m b) -> Tube a b m r Source
Similar to map
except it maps a monadic function instead of a pure one.
sequence :: Monad m => Tube (m a) a m r Source
Evaluates and extracts a pure value from a monadic one.
display :: MonadIO m => Sink String m () Source
Sink for String
s to stdout. This is mostly for debugging / ghci example
purposes.
Pump
type Pump a b = CofreeT (PumpF a b) Source
A Pump
is the dual to a Tube
. Intuitively, if a Tube
is a stream-
processing computation, then a Pump
is both a stream generator and reducer.
Examples may help!
One interesting use of a Pump
is as a data stream, which can be fed into a
Tube
or Sink
.
import Data.Functor.Identity e :: Pump (Maybe Int) Int Identity Int e = mkPump (Identity 0) ((Identity x) -> (Just x, Identity (x+1))) const ex1 :: IO () ex1 = do run $ each e >< take 10 >< map show >< display -- displays 0-9 in the console
A Pump
may also be used to fold a Source
. Indeed, a Pump
may be thought
of as both a non-recursive left fold and a non-recursive unfold paired
together. (This is called a "metamorphism," hence the function "meta".)
num_src :: Source Int IO () num_src = do forM_ [1..] $ n -> do lift . putStrLn $ "Yielding " ++ (show n) yield n enum_ex :: IO () enum_ex = do v <- reduce (flip send) (meta (+) 0 (x -> (x,x))) extract $ num_src >< take 5 putStrLn . show $ "v = " ++ (show v) -- v = 15
The following is an example of a Pump
both accumulating values from a
Source
and then enumerating them into a Sink
. This gives back both the
result of the computation and the unused input.
import Data.Functor.Identity
-- a Sink
that stops after 5 loops, or when input is exhausted
sum_snk :: Sink (Maybe Int) IO Int
sum_snk = do
ns <- forM [1,2,3,4,5] $ _ -> do
mn <- await
case mn of
Just n -> return [n]
Nothing -> return []
return $ sum . concat $ ns
source_sink_ex :: IO ([Int], Int)
source_sink_ex = do
e <- reduce (flip send) (enumerator []) id $ num_src >< take 10
(unused, total) <- pump (,) e sum_snk
putStrLn $ "Total: " ++ (show total)
putStrLn $ "Unused: " ++ (show unused)
-- "Total: 15"
-- "Unused: [6,7,8,9,10]"
Note that when a Pump
and a Tube
are combined with pump
, that the Tube
determines control flow. Pump
s are comonads, not monads.
There are doubtless more and more interesting examples of combining Tube
s
and Pump
s. If you think of any, drop the author a line!
send :: Comonad w => b -> Pump a b w r -> Pump a b w r Source
Send a value into a Pump
, effectively re-seeding the stream.
pumpM :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w (m x) -> Tube a b m y -> m r Source
A variant of pump
which allows effects to be executed inside the pump as well.
meta :: (x -> a -> x) -> x -> (x -> (b, x)) -> Pump b a Identity x Source
Takes a fold function, an initial value, and an unfold to produce a metamorphism. Can be used to change.
enumerator :: [a] -> Pump (Maybe a) a Identity [a] Source
Constructs an enumerator pump, which can buffer values and then enumerate
them to, say, a Sink
(see the examples above).
Re-exports
lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
Lift a computation from the argument monad to the constructed monad.