automaton-1.3: Effectful streams and automata in initial encoding
Safe HaskellSafe-Inferred
LanguageHaskell2010

Data.Stream

Synopsis

Creating streams

data StreamT m a Source #

Effectful streams in initial encoding.

A stream consists of an internal state s, and a step function. This step can make use of an effect in m (which is often a monad), alter the state, and return a result value. Its semantics is continuously outputting values of type b, while performing side effects in m.

An initial encoding was chosen instead of the final encoding known from e.g. list-transformer, dunai, machines, streaming, ..., because the initial encoding is much more amenable to compiler optimizations than the final encoding, which is:

  data StreamFinalT m b = StreamFinalT (m (b, StreamFinalT m b))

When two streams are composed, GHC can often optimize the combined step function, resulting in a faster streams than what the final encoding can ever achieve, because the final encoding has to step through every continuation. Put differently, the compiler can perform static analysis on the state types of initially encoded state machines, while the final encoding knows its state only at runtime.

This performance gain comes at a peculiar cost: Recursive definitions of streams are not possible, e.g. an equation like: fixA stream = stream * fixA stream This is impossible since the stream under definition itself appears in the definition body, and thus the internal state type would be recursively defined, which GHC doesn't allow: Type level recursion is not supported in existential types. An stream defined thusly will typically hang and/or leak memory, trying to build up an infinite type at runtime.

It is nevertheless possible to define streams recursively, but one needs to first identify the recursive definition of its state type. Then for the greatest generality, fixStream and fixStream' can be used, and some special cases are covered by functions such as fixA, parallely, many and some.

Constructors

forall s. StreamT 

Fields

  • state :: s

    The internal state of the stream

  • step :: s -> m (Result s a)

    Stepping a stream by one tick means: 1. performing a side effect in m 2. updating the internal state s 3. outputting a value of type a

Instances

Instances details
MFunctor StreamT Source # 
Instance details

Defined in Data.Stream

Methods

hoist :: forall m n (b :: k). Monad m => (forall a. m a -> n a) -> StreamT m b -> StreamT n b #

Alternative m => Alternative (StreamT m) Source #

empty just performs empty in the underlying monad m. s1 <|> s2 starts in an undecided state, and explores the possibilities of continuing in s1 or s2 on the first tick, using the underlying m.

Instance details

Defined in Data.Stream

Methods

empty :: StreamT m a #

(<|>) :: StreamT m a -> StreamT m a -> StreamT m a #

some :: StreamT m a -> StreamT m [a] #

many :: StreamT m a -> StreamT m [a] #

Applicative m => Applicative (StreamT m) Source #

pure forever returns the same value, (<*>) steps two streams synchronously.

Instance details

Defined in Data.Stream

Methods

pure :: a -> StreamT m a #

(<*>) :: StreamT m (a -> b) -> StreamT m a -> StreamT m b #

liftA2 :: (a -> b -> c) -> StreamT m a -> StreamT m b -> StreamT m c #

(*>) :: StreamT m a -> StreamT m b -> StreamT m b #

(<*) :: StreamT m a -> StreamT m b -> StreamT m a #

Functor m => Functor (StreamT m) Source # 
Instance details

Defined in Data.Stream

Methods

fmap :: (a -> b) -> StreamT m a -> StreamT m b #

(<$) :: a -> StreamT m b -> StreamT m a #

Selective m => Selective (StreamT m) Source # 
Instance details

Defined in Data.Stream

Methods

select :: StreamT m (Either a b) -> StreamT m (a -> b) -> StreamT m b #

Align m => Align (StreamT m) Source # 
Instance details

Defined in Data.Stream

Methods

nil :: StreamT m a #

Semialign m => Semialign (StreamT m) Source # 
Instance details

Defined in Data.Stream

Methods

align :: StreamT m a -> StreamT m b -> StreamT m (These a b) #

alignWith :: (These a b -> c) -> StreamT m a -> StreamT m b -> StreamT m c #

(Applicative m, Floating a) => Floating (StreamT m a) Source # 
Instance details

Defined in Data.Stream

Methods

pi :: StreamT m a #

exp :: StreamT m a -> StreamT m a #

log :: StreamT m a -> StreamT m a #

sqrt :: StreamT m a -> StreamT m a #

(**) :: StreamT m a -> StreamT m a -> StreamT m a #

logBase :: StreamT m a -> StreamT m a -> StreamT m a #

sin :: StreamT m a -> StreamT m a #

cos :: StreamT m a -> StreamT m a #

tan :: StreamT m a -> StreamT m a #

asin :: StreamT m a -> StreamT m a #

acos :: StreamT m a -> StreamT m a #

atan :: StreamT m a -> StreamT m a #

sinh :: StreamT m a -> StreamT m a #

cosh :: StreamT m a -> StreamT m a #

tanh :: StreamT m a -> StreamT m a #

asinh :: StreamT m a -> StreamT m a #

acosh :: StreamT m a -> StreamT m a #

atanh :: StreamT m a -> StreamT m a #

log1p :: StreamT m a -> StreamT m a #

expm1 :: StreamT m a -> StreamT m a #

log1pexp :: StreamT m a -> StreamT m a #

log1mexp :: StreamT m a -> StreamT m a #

(Applicative m, Num a) => Num (StreamT m a) Source # 
Instance details

Defined in Data.Stream

Methods

(+) :: StreamT m a -> StreamT m a -> StreamT m a #

(-) :: StreamT m a -> StreamT m a -> StreamT m a #

(*) :: StreamT m a -> StreamT m a -> StreamT m a #

negate :: StreamT m a -> StreamT m a #

abs :: StreamT m a -> StreamT m a #

signum :: StreamT m a -> StreamT m a #

fromInteger :: Integer -> StreamT m a #

(Applicative m, Fractional a) => Fractional (StreamT m a) Source # 
Instance details

Defined in Data.Stream

Methods

(/) :: StreamT m a -> StreamT m a -> StreamT m a #

recip :: StreamT m a -> StreamT m a #

fromRational :: Rational -> StreamT m a #

(VectorSpace v s, Eq s, Floating s, Applicative m) => VectorSpace (StreamT m v) (StreamT m s) Source # 
Instance details

Defined in Data.Stream

Methods

zeroVector :: StreamT m v #

(*^) :: StreamT m s -> StreamT m v -> StreamT m v #

(^/) :: StreamT m v -> StreamT m s -> StreamT m v #

(^+^) :: StreamT m v -> StreamT m v -> StreamT m v #

(^-^) :: StreamT m v -> StreamT m v -> StreamT m v #

negateVector :: StreamT m v -> StreamT m v #

dot :: StreamT m v -> StreamT m v -> StreamT m s #

norm :: StreamT m v -> StreamT m s #

normalize :: StreamT m v -> StreamT m v #

unfold :: Applicative m => s -> (s -> Result s a) -> StreamT m a Source #

Initialise with an internal state, update the state and produce output without side effects.

unfold_ :: Applicative m => s -> (s -> s) -> StreamT m s Source #

Like unfold, but output the current state.

constM :: Functor m => m a -> StreamT m a Source #

Constantly perform the same effect, without remembering a state.

hoist' :: (forall x. m1 x -> m2 x) -> StreamT m1 a -> StreamT m2 a Source #

Hoist a stream along a monad morphism, by applying said morphism to the step function.

This is like mmorph's hoist, but it doesn't require a Monad constraint on m2.

Running streams

stepStream :: Functor m => StreamT m a -> m (Result (StreamT m a) a) Source #

Perform one step of a stream, resulting in an updated stream and an output value.

reactimate :: Monad m => StreamT m () -> m void Source #

Run a stream with trivial output.

If the output of a stream does not contain information, all of its meaning is in its effects. This function runs the stream indefinitely. Since it will never return with a value, this function also has no output (its output is void). The only way it can return is if m includes some effect of termination, e.g. Maybe or Either could terminate with a Nothing or Left value, or IO can raise an exception.

streamToList :: Monad m => StreamT m a -> m [a] Source #

Run a stream, collecting the outputs in a lazy, infinite list.

Modifying streams

withStreamT :: (Functor m, Functor n) => (forall s. m (Result s a) -> n (Result s b)) -> StreamT m a -> StreamT n b Source #

Change the output type and effect of a stream without changing its state type.

concatS :: Monad m => StreamT m [a] -> StreamT m a Source #

Buffer the output of a stream, returning one value at a time.

This function lets a stream control the speed at which it produces data, since it can decide to produce any amount of output at every step.

Exception handling

applyExcept :: Monad m => StreamT (ExceptT (e1 -> e2) m) a -> StreamT (ExceptT e1 m) a -> StreamT (ExceptT e2 m) a Source #

Streams with exceptions are Applicative in the exception type.

Run the first stream until it throws a function as an exception, then run the second one. If the second one ever throws an exception, apply the function thrown by the first one to it.

exceptS :: Applicative m => StreamT (ExceptT e m) b -> StreamT m (Either e b) Source #

Whenever an exception occurs, output it and retry on the next step.

selectExcept :: Monad m => StreamT (ExceptT (Either e1 e2) m) a -> StreamT (ExceptT (e1 -> e2) m) a -> StreamT (ExceptT e2 m) a Source #

Run the first stream until it throws an exception. If the exception is Right, throw it immediately. If it is Left, run the second stream until it throws a function, which is then applied to the first exception.

Fix points, or recursive definitions

fixStream Source #

Arguments

:: Functor m 
=> (forall s. s -> t s)

The recursive definition of the state of the stream.

-> (forall s. (s -> m (Result s a)) -> t s -> m (Result (t s) a))

The recursive definition of the step function of the stream.

-> StreamT m a 

Recursively define a stream from a recursive definition of the state, and of the step function.

If you want to define a stream recursively, this is not possible directly. For example, consider this definition: loops :: Monad m => StreamT m [Int] loops = (:) $ unfold_ 0 (+ 1) * loops The defined value loops contains itself in its definition. This means that the internal state type of loops must itself be recursively defined. But GHC cannot do this automatically, because type level and value level are separate. Instead, we need to spell out the type level recursion explicitly with a type constructor, over which we will take the fixpoint.

In this example, we can figure out from the definitions that: 1. unfold_ 0 (+ 1) has 0 :: Int as state 2. (:) does not change the state 3. <*> takes the product of both states

So the internal state s of loops must satisfy the equation s = (Int, s). If the recursion is written as above, it tries to compute the infinite tuple (Int, (Int, (Int, ...))), which hangs. Instead, we need to define a type operator over which we take the fixpoint:

-- You need to write this:
data Loops x = Loops Int x

-- The library supplies:
data Fix f = Fix f (Fix f)
type LoopsState = Fix Loops

We can then use fixStream to define the recursive definition of loops. For this, we have to to tediously inline the definitions of unfold_, (:), and <*>, until we arrive at an explicit recursive definition of the state and the step function of loops, separately. These are the two arguments of fixStream.

loops :: Monad m => StreamT m [Int]
loops = fixStream (Loops 0) $ fixStep (Loops n fixState) -> do
  Result s' a <- fixStep fixState
  return $ Result (Loops (n + 1) s') a

fixStream' Source #

Arguments

:: Functor m 
=> (forall s. s -> t s) 
-> (forall s. s -> (s -> m (Result s a)) -> t s -> m (Result (t s) a))

The recursive definition of the state of the stream.

-> StreamT m a

The recursive definition of the step function of the stream.

A generalisation of fixStream where the step definition is allowed to depend on the state.

fixA :: Applicative m => StreamT m (a -> a) -> StreamT m a Source #

The solution to the equation 'fixA stream = stream * fixA stream.

Such a fix point operator needs to be used instead of the above direct definition because recursive definitions of streams loop at runtime due to the initial encoding of the state.