streamly-0.7.0: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Safe HaskellNone
LanguageHaskell2010

Streamly.Tutorial

Contents

Description

Streamly is a general computing framework based on concurrent data flow programming. The IO monad and pure lists are a special case of streamly. On one hand, streamly extends the lists of pure values to lists of monadic actions, on the other hand it extends the IO monad with concurrent non-determinism. In simple imperative terms we can say that streamly extends the IO monad with for loops and nested for loops with concurrency support. Hopefully, this analogy becomes clearer once you go through this tutorial.

Streaming in general enables writing modular, composable and scalable applications with ease, and concurrency allows you to make them scale and perform well. Streamly enables writing scalable concurrent applications without being aware of threads or synchronization. No explicit thread control is needed. Where applicable, concurrency rate is automatically controlled based on the demand by the consumer. However, combinators can be used to fine tune the concurrency control.

Streaming and concurrency together enable expressing reactive applications conveniently. See the CirclingSquare example in the examples directory for a simple SDL based FRP example. To summarize, streamly provides a unified computing framework for streaming, non-determinism and functional reactive programming in an elegant and simple API that is a natural extension of pure lists to monadic streams.

In this tutorial we will go over the basic concepts and how to use the library. Before you go through this tutorial we recommend that you take a look at:

  • The quick overview in the package README file.
  • The overview of streams and folds in the Streamly module.

Once you finish this tutorial, see the last section for further reading resources.

Synopsis

    Stream Types

    The monadic stream API offered by Streamly is very close to the Haskell Prelude pure lists' API, it can be considered as a natural extension of lists to monadic actions. Streamly streams provide concurrent composition and merging of streams. It can be considered as a concurrent list transformer.

    The basic stream type is Serial, it represents a sequence of IO actions, and is a Monad. The Serial monad is almost a drop in replacement for the IO monad, IO monad is a special case of the Serial monad; IO monad represents a single IO action whereas the Serial monad represents a series of IO actions. The only change you need to make to go from IO to Serial is to use drain to run the monad and to prefix the IO actions with either yieldM or liftIO. If you use liftIO you can switch from Serial to IO monad by simply removing the drain function; no other changes are needed unless you have used some stream specific composition or combinators.

    Similarly, the Serial type is almost a drop in replacement for pure lists, pure lists are a special case of monadic streams. If you use nil in place of '[]' and |: in place : you can replace a list with a Serial stream. The only difference is that the elements must be monadic type and to operate on the streams we must use the corresponding functions from Streamly.Prelude instead of using the base Prelude.

    Concurrent Streams

    Many stream operations can be done concurrently:

    • Streams can be generated concurrently.
    • Streams can be merged concurrently.
    • Multiple stages in a streaming pipeline can run concurrently.
    • Streams can be mapped and zipped concurrently.
    • In monadic composition they combine like a list transformer, providing concurrent non-determinism.

    There are three basic concurrent stream styles, Ahead, Async, and Parallel. The Ahead style streams are similar to Serial except that they can speculatively execute multiple stream actions concurrently in advance. Ahead would return exactly the same stream as Serial except that it may execute the actions concurrently. The Async style streams, like Ahead, speculatively execute multiple stream actions in advance but return the results in their finishing order rather than in the stream traversal order. Parallel is like Async except that it provides unbounded parallelism instead of controlled parallelism.

    For easy reference, we can classify the stream types based on execution order, consumption order, and bounded or unbounded concurrency. Execution could be serial (i.e. synchronous) or asynchronous. In serial execution we execute the next action in the stream only after the previous one has finished executing. In asynchronous execution multiple actions in the stream can be executed asynchronously i.e. the next action can start executing even before the first one has finished. Consumption order determines the order in which the outputs generated by the composition are consumed. Consumption could be serial or asynchronous. In serial consumption, the outputs are consumed in the traversal order, in asynchronous consumption the outputs are consumed as they arrive i.e. first come first serve order.

    Type Execution Consumption Concurrency
    Serial Serial Serial None
    Ahead Asynchronous Serial bounded
    Async Asynchronous Asynchronous bounded
    Parallel Asynchronous Asynchronous unbounded

    All these types can be freely inter-converted using type conversion combinators or type annotations, without any cost, to achieve the desired composition style. To force a particular type of composition, we coerce the stream type using the corresponding type adapting combinator from serially, aheadly, asyncly, or parallely. The default stream type is inferred as Serial unless you change it by using one of the combinators or by using a type annotation.

    Combining Streams

    Streams can be combined using <> or mappend to form a composite. Composite streams can be interpreted in a depth first or breadth first manner using an appropriate type conversion before consumption. Deep (e.g. Serial) stream type variants traverse a composite stream in a depth first manner, such that each stream is traversed fully before traversing the next stream. Wide (e.g. WSerial) stream types traverse it in a breadth first manner, such that one element from each stream is traversed before coming back to the first stream again.

    Each stream type has a wide traversal variant prefixed by W. The wide variant differs only in the Semigroup/Monoid, Applicative/Monad compositions of the streams. The following table summarizes the basic types and the corresponding wide variants:

    +------------+-----------+
    | Deep       | Wide      |
    +============+===========+
    | Serial     | WSerial   |
    +------------+-----------+
    | Ahead      | WAhead    |
    +------------+-----------+
    | Async      | WAsync    |
    +------------+-----------+
    

    Other than these types there are also ZipSerial and ZipAsync types that zip streams serially or concurrently using Applicative operation. These types are not monads they are only applicatives and they do not differ in Semigroup composition.

    Imports and Supporting Code

    In most of example snippets we do not repeat the imports. Where imports are not explicitly specified use the imports shown below.

    import Streamly
    import Streamly.Prelude ((|:), nil)
    import qualified Streamly.Prelude as S
    
    import Control.Concurrent
    import Control.Monad (forever)
    

    To illustrate concurrent vs serial composition aspects, we will use the following delay function to introduce a sleep or delay specified in seconds. After the delay it prints the number of seconds it slept.

    delay n = S.yieldM $ do
     threadDelay (n * 1000000)
     tid <- myThreadId
     putStrLn (show tid ++ ": Delay " ++ show n)
    

    Generating Streams

    We will assume the following imports in this tutorial. Go ahead, fire up a GHCi session and import these lines to start playing.

    > import Streamly
    > import Streamly.Prelude ((|:))
    > import qualified Streamly.Prelude as S
    
    > import Control.Concurrent
    

    nil represents an empty stream and consM or its operator form |: adds a monadic action at the head of the stream.

    > S.toList S.nil
    []
    > S.toList $ getLine |: getLine |: S.nil
    hello
    world
    ["hello","world"]
    

    To create a singleton stream from a pure value use yield or pure and to create a singleton stream from a monadic action use yieldM. Note that in case of Zip applicative streams "pure" repeats the value to generate an infinite stream.

    > S.toList $ pure 1
    [1]
    > S.toList $ S.yield 1
    [1]
    > S.toList $ S.yieldM getLine
    hello
    ["hello"]
    

    To create a stream from pure values in a Foldable container use fromFoldable which is equivalent to a fold using cons and nil:

    > S.toList $ S.fromFoldable [1..3]
    [1,2,3]
    > S.toList $ foldr S.cons S.nil [1..3]
    [1,2,3]
    

    To create a stream from monadic actions in a Foldable container just use a right fold using consM and nil:

    > S.drain $ foldr (|:) S.nil [putStr "Hello ", putStrLn "world!"]
    Hello world!
    

    For more ways to construct a stream see the module Streamly.Prelude.

    Generating Streams Concurrently

    Monadic construction and generation functions like consM, unfoldrM, replicateM, repeatM, iterateM and fromFoldableM work concurrently when used with appropriate stream type combinator. The pure versions of these APIs are not concurrent, however you can use the monadic versions even for pure computations by wrapping the pure value in a monad to get the concurrent generation capability where required.

    The following code finishes in 3 seconds (6 seconds when serial):

    > let p n = threadDelay (n * 1000000) >> return n
    > S.toList $ parallely $ p 3 |: p 2 |: p 1 |: S.nil
    [1,2,3]
    > S.toList $ aheadly $ p 3 |: p 2 |: p 1 |: S.nil
    [3,2,1]
    

    The following finishes in 10 seconds (100 seconds when serial):

    > S.drain $ asyncly $ S.replicateM 10 $ p 10
    

    Eliminating Streams

    We have already seen drain and toList to eliminate a stream in the examples above. drain runs a stream discarding the results i.e. only for effects. toList runs the stream and collects the results in a list.

    For other ways to eliminate a stream see the Folding section in Streamly.Prelude module.

    Concurrent Pipeline Stages

    The concurrent function application operators |$ and |& apply a stream argument to a stream function concurrently to compose a concurrent pipeline of stream processing functions:

    Because both the stages run concurrently, we would see a delay of only 1 second instead of 2 seconds in the following:

    > let p n = threadDelay (n * 1000000) >> return n
    > S.drain $ S.repeatM (p 1) |& S.mapM (\x -> p 1 >> print x)
    

    Transforming Streams

    Transformation over a stream is the equivalent of a for loop construct in imperative paradigm. We iterate over every element in the stream and perform certain transformations for each element. Transformations may involve mapping functions over the elements, filtering elements from the stream or folding all the elements in the stream into a single value. Streamly streams are exactly like lists and you can perform all the transformations in the same way as you would on lists.

    Here is a simple console echo program that just echoes every input line, forever:

    > import Data.Function ((&))
    > S.drain $ S.repeatM getLine & S.mapM putStrLn
    

    The following code snippet reads lines from standard input, filters blank lines, drops the first non-blank line, takes the next two, up cases them, numbers them and prints them:

    import Streamly
    import qualified Streamly.Prelude as S
    import Data.Char (toUpper)
    import Data.Function ((&))
    
    main = S.drain $
           S.repeatM getLine
         & S.filter (not . null)
         & S.drop 1
         & S.take 2
         & fmap (map toUpper)
         & S.zipWith (\n s -> show n ++ " " ++ s) (S.fromFoldable [1..])
         & S.mapM putStrLn
    

    Mapping Concurrently

    Monadic transformation functions mapM and sequence work concurrently when used with appropriate stream type combinators. The pure versions do not work concurrently, however you can use the monadic versions even for pure computations to get the concurrent transformation capability where required.

    This would print a value every second (2 seconds when serial):

    > let p n = threadDelay (n * 1000000) >> return n
    > S.drain $ S.aheadly $ S.mapM (\x -> p 1 >> print x) (serially $ S.repeatM (p 1))
    

    Merging Streams

    Semigroup Style

    We can combine two streams into a single stream using semigroup composition operation <>. Streams can be combined in many different ways as described in the following sections, the <> operation behaves differently depending on the stream type in effect. The stream type and therefore the composition style can be changed at any point using one of the type combinators as discussed earlier.

    Deep Serial Composition (Serial)

    The Semigroup operation <> of the Serial type combines the two streams in a serial depth first manner. We use the serially type combinator to effect Serial style of composition. We can also use an explicit Serial type annotation for the stream to achieve the same effect. However, since Serial is the default type unless explicitly specified by using a combinator, we can omit using an explicit combinator or type annotation for this style of composition.

    When two streams with multiple elements are combined in this manner, the monadic actions in the two streams are performed sequentially i.e. first all actions in the first stream are performed sequentially and then all actions in the second stream are performed sequentially. We call it serial depth first as the full depth of one stream is fully traversed before we move to the next. The following example prints the sequence 1, 2, 3, 4:

    main = S.drain $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)
    
    1
    2
    3
    4
    

    All actions in both the streams are performed serially in the same thread. In the following example we can see that all actions are performed in the same thread and take a combined total of 3 + 2 + 1 = 6 seconds:

    main = S.drain $ delay 3 <> delay 2 <> delay 1
    
    ThreadId 36: Delay 3
    ThreadId 36: Delay 2
    ThreadId 36: Delay 1
    

    The polymorphic version of the binary operation <> of the Serial type is serial. We can use serial to join streams in a sequential manner irrespective of the type of stream:

    main = S.drain $ (print 1 |: print 2 |: nil) `serial` (print 3 |: print 4 |: nil)
    

    Wide Serial Composition (WSerial)

    The Semigroup operation <> of the WSerial type combines the two streams in a serial breadth first manner. We use the wSerially type combinator to effect WSerial style of composition. We can also use the WSerial type annotation for the stream to achieve the same effect.

    When two streams with multiple elements are combined in this manner, we traverse all the streams in a breadth first manner i.e. one action from each stream is performed and yielded to the resulting stream before we come back to the first stream again and so on. The following example prints the sequence 1, 3, 2, 4

    main = S.drain . wSerially $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)
    
    1
    3
    2
    4
    

    Even though the monadic actions of the two streams are performed in an interleaved manner they are all performed serially in the same thread. In the following example we can see that all actions are performed in the same thread and take a combined total of 3 + 2 + 1 = 6 seconds:

    main = S.drain . wSerially $ delay 3 <> delay 2 <> delay 1
    
    ThreadId 36: Delay 3
    ThreadId 36: Delay 2
    ThreadId 36: Delay 1
    

    The polymorphic version of the WSerial binary operation <> is called wSerial. We can use wSerial to join streams in an interleaved manner irrespective of the type, notice that we have not used the wSerially combinator in the following example:

    main = S.drain $ (print 1 |: print 2 |: nil) `wSerial` (print 3 |: print 4 |: nil)
    
    1
    3
    2
    4
    

    Note that this composition cannot be used to fold infinite number of streams since it requires preserving the state until a stream is finished.

    Deep Speculative Composition (Ahead)

    The Semigroup operation <> of the Ahead type combines two streams in a serial depth first manner with concurrent lookahead. We use the aheadly type combinator to effect Ahead style of composition. We can also use an explicit Ahead type annotation for the stream to achieve the same effect.

    When two streams are combined in this manner, the streams are traversed in depth first manner just like Serial, however it can execute the next stream concurrently and keep the results ready when its turn arrives. Concurrent execution of the next stream(s) is performed if the first stream blocks or if it cannot produce output at the rate that is enough to meet the consumer demand. Multiple streams can be executed concurrently to meet the demand. The following example would print the result in a second even though each action in each stream takes one second:

    main = do
     xs <- S.toList . aheadly $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
     print xs
     where p n = threadDelay 1000000 >> return n
    
    [1,2,3,4]
    

    Each stream is constructed aheadly and then both the streams are merged aheadly, therefore, all the actions can run concurrently but the result is presented in serial order.

    You can also use the polymorphic combinator ahead in place of <> to compose any type of streams in this manner.

    Deep Asynchronous Composition (Async)

    The Semigroup operation <> of the Async type combines the two streams in a depth first manner with parallel look ahead. We use the asyncly type combinator to effect Async style of composition. We can also use the Async type annotation for the stream type to achieve the same effect.

    When two streams with multiple elements are combined in this manner, the streams are traversed in depth first manner just like Serial, however it can execute the next stream concurrently and return the results from it as they arrive i.e. the results from the next stream may be yielded even before the results from the first stream. Concurrent execution of the next stream(s) is performed if the first stream blocks or if it cannot produce output at the rate that is enough to meet the consumer demand. Multiple streams can be executed concurrently to meet the demand. In the example below each element in the stream introduces a constant delay of 1 second, however, it takes just one second to produce all the results. The results are not guaranteed to be in any particular order:

    main = do
      xs <- S.toList . asyncly $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
      print xs
      where p n = threadDelay 1000000 >> return n
    
    [4,2,1,3]
    

    The constituent streams are also composed in Async manner and the composition of streams too. We can compose the constituent streams to run serially, in that case it would take 2 seconds to produce all the results. The elements in the serial streams would be in serial order in the results:

    main = do
      xs <- S.toList . asyncly $ (serially $ p 1 |: p 2 |: nil) <> (serially $ p 3 |: p 4 |: nil)
      print xs
      where p n = threadDelay 1000000 >> return n
    
    [3,1,2,4]
    

    In the following example we can see that new threads are started when a computation blocks. Notice that the output from the stream with the shortest delay is printed first. The whole computation takes maximum of (3, 2, 1) = 3 seconds:

    main = S.drain . asyncly $ delay 3 <> delay 2 <> delay 1
    
    ThreadId 42: Delay 1
    ThreadId 41: Delay 2
    ThreadId 40: Delay 3
    

    When we have a tree of computations composed using this style, the tree is traversed in DFS style just like the Serial style, the only difference is that here we can move on to executing the next stream if a stream blocks. However, we will not start new threads if we have sufficient output to saturate the consumer. This is why we call it left-biased demand driven or adaptive concurrency style, the concurrency tends to stay on the left side of the composition as long as possible. More threads are started based on the pull rate of the consumer. The following example prints an output every second as all of the actions are concurrent.

    main = S.drain . asyncly $ (delay 1 <> delay 2) <> (delay 3 <> delay 4)
    
    1
    2
    3
    4
    

    All the computations may even run in a single thread when more threads are not needed. As you can see, in the following example the computations are run in a single thread one after another, because none of them blocks. However, if the thread consuming the stream were faster than the producer then it would have started parallel threads for each computation to keep up even if none of them blocks:

    main = S.drain . asyncly $ traced (sqrt 9) <> traced (sqrt 16) <> traced (sqrt 25)
     where traced m = S.yieldM (myThreadId >>= print) >> return m
    
    ThreadId 40
    ThreadId 40
    ThreadId 40
    

    Note that the order of printing in the above examples may change due to variations in scheduling latencies for concurrent threads.

    The polymorphic version of the Async binary operation <> is called async. We can use async to join streams in a left biased adaptively concurrent manner irrespective of the type, notice that we have not used the asyncly combinator in the following example:

    main = S.drain $ delay 3 `async` delay 2 `async` delay 1
    
    ThreadId 42: Delay 1
    ThreadId 41: Delay 2
    ThreadId 40: Delay 3
    

    Since the concurrency provided by this operator is demand driven it cannot be used when the composed computations start timers that are relative to each other because all computations may not be started at the same time and therefore timers in all of them may not start at the same time. When relative timing among all computations is important or when we need to start all computations at once for any reason Parallel style must be used instead.

    Async style utilizes resources optimally and should be preferred over Parallel or WAsync unless you really need those. Async should be used when we know that the computations can run in parallel but we do not care if they actually run in parallel or not, that decision can be left to the scheduler based on demand. Also, note that async operator can be used to fold infinite number of streams in contrast to the Parallel or WAsync styles, because it does not require us to run all of them at the same time in a fair manner.

    Wide Asynchronous Composition (WAsync)

    The Semigroup operation <> of the WAsync type combines two streams in a concurrent manner using breadth first traversal. We use the wAsyncly type combinator to effect WAsync style of composition. We can also use the WAsync type annotation for the stream to achieve the same effect.

    When streams with multiple elements are combined in this manner, we traverse all the streams concurrently in a breadth first manner i.e. one action from each stream is performed and yielded to the resulting stream before we come back to the first stream again and so on. Even though we execute the actions in a breadth first order the outputs are consumed on a first come first serve basis.

    In the following example we can see that outputs are produced in the breadth first traversal order but this is not guaranteed.

    main = S.drain . wAsyncly $ (serially $ print 1 |: print 2 |: nil) <> (serially $ print 3 |: print 4 |: nil)
    
    1
    3
    2
    4
    

    The polymorphic version of the binary operation <> of the WAsync type is wAsync. We can use wAsync to join streams using a breadth first concurrent traversal irrespective of the type, notice that we have not used the wAsyncly combinator in the following example:

    main = S.drain $ delay 3 `wAsync` delay 2 `wAsync` delay 1
    
    ThreadId 42: Delay 1
    ThreadId 41: Delay 2
    ThreadId 40: Delay 3
    

    Since the concurrency provided by this style is demand driven it may not be used when the composed computations start timers that are relative to each other because all computations may not be started at the same time and therefore timers in all of them may not start at the same time. When relative timing among all computations is important or when we need to start all computations at once for any reason Parallel style must be used instead.

    Parallel Asynchronous Composition (Parallel)

    The Semigroup operation <> of the Parallel type combines the two streams in a fairly concurrent manner with round robin scheduling. We use the parallely type combinator to effect Parallel style of composition. We can also use the Parallel type annotation for the stream type to achieve the same effect.

    When two streams with multiple elements are combined in this manner, the monadic actions in both the streams are performed concurrently with a fair round robin scheduling. The outputs are yielded in the order in which the actions complete. This is pretty similar to the WAsync type, the difference is that WAsync is adaptive to the consumer demand and may or may not execute all actions in parallel depending on the demand, whereas Parallel runs all the streams in parallel irrespective of the demand.

    The following example sends a query to all the three search engines in parallel and prints the name of the search engines in the order in which the responses arrive. You need the http-conduit package to run this example:

    import Streamly
    import qualified Streamly.Prelude as S
    import Network.HTTP.Simple
    
    main = S.drain . parallely $ google <> bing <> duckduckgo
        where
            google     = get "https://www.google.com/search?q=haskell"
            bing       = get "https://www.bing.com/search?q=haskell"
            duckduckgo = get "https://www.duckduckgo.com/?q=haskell"
            get s = S.yieldM (httpNoBody (parseRequest_ s) >> putStrLn (show s))
    

    The polymorphic version of the binary operation <> of the Parallel type is parallel. We can use parallel to join streams in a fairly concurrent manner irrespective of the type, notice that we have not used the parallely combinator in the following example:

    main = S.drain $ delay 3 `parallel` delay 2 `wAsync` delay 1
    
    ThreadId 42: Delay 1
    ThreadId 41: Delay 2
    ThreadId 40: Delay 3
    

    Note that this style of composition cannot be used to combine infinite number of streams, as it will lead to an infinite sized scheduling queue.

    Monoid Style

    We can use Monoid instances to fold a container of streams in the desired style using fold or foldMap. We have also provided some fold utilities to fold streams using the polymorphic combine operations:

    • foldWith is like fold, it folds a Foldable container of streams using the given composition operator.
    • foldMapWith is like foldMap, it folds like foldWith but also maps a function before folding.
    • forEachWith is like foldMapwith but the container argument comes before the function argument.

    All of the following are equivalent and start ten concurrent tasks each with a delay from 1 to 10 seconds, resulting in the printing of each number every second:

    import Streamly
    import qualified Streamly.Prelude as S
    import Control.Concurrent
    
    main = do
     S.drain $ asyncly $ foldMap delay [1..10]
     S.drain $ S.foldWith    async (map delay [1..10])
     S.drain $ S.foldMapWith async delay [1..10]
     S.drain $ S.forEachWith async [1..10] delay
     where delay n = S.yieldM $ threadDelay (n * 1000000) >> print n
    

    Nesting Streams

    Till now we discussed ways to apply transformations on a stream or to merge streams together to create another stream. We mentioned earlier that transforming a stream is similar to a for loop in the imperative paradigm. We will now discuss the concept of a nested composition of streams which is analogous to nested for loops in the imperative paradigm. Functional programmers call this style of composition a list transformer or ListT. Logic programmers call it a logic monad or non-deterministic composition, but for ordinary imperative minded people like me it is easier to think in terms of good old nested for loops.

    Monad

    In functional programmer's parlance the Monad instances of different IsStream types implement non-determinism, exploring all possible combination of choices from both the streams. From an imperative programmer's point of view it behaves like nested loops i.e. for each element in the first stream and for each element in the second stream execute the body of the loop.

    The Monad instances of Serial, WSerial, Async and WAsync stream types support different flavors of nested looping. In other words, they are all variants of list transformer. The nesting behavior of these types correspond exactly to the way they merge streams as we discussed in the previous section.

    Deep Serial Nesting (Serial)

    The Monad composition of the Serial type behaves like a standard list transformer. This is the default when we do not use an explicit type combinator. However, the serially type combinator can be used to switch to this style of composition. We will see how this style of composition works in the following examples.

    Let's start with an example with a simple for loop without any nesting. For simplicity of illustration we are using streams of pure values in all the examples. However, the streams could also be made of monadic actions instead.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = S.drain $ do
        x <- S.fromFoldable [3,2,1]
        delay x
    
    ThreadId 30: Delay 3
    ThreadId 30: Delay 2
    ThreadId 30: Delay 1
    

    As we can see, the code after the fromFoldable statement is run three times, once for each value of x drawn from the stream. All the three iterations are serial and run in the same thread one after another. In imperative terms this is equivalent to a for loop with three iterations.

    A console echo loop copying standard input to standard output can simply be written like this:

    import Streamly
    import qualified Streamly.Prelude as S
    
    import Control.Monad (forever)
    
    main = S.drain $ forever $ S.yieldM getLine >>= S.yieldM . putStrLn
    

    When multiple streams are composed using this style they nest in a DFS manner i.e. nested iterations of a loop are executed before we proceed to the next iteration of the parent loop. This behaves just like nested for loops in imperative programming.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = S.drain $ do
        x <- S.fromFoldable [1,2]
        y <- S.fromFoldable [3,4]
        S.yieldM $ putStrLn $ show (x, y)
    
    (1,3)
    (1,4)
    (2,3)
    (2,4)
    

    Notice that this is analogous to merging streams of type Serial or merging streams using serial.

    Wide Serial Nesting (WSerial)

    The Monad composition of WSerial type interleaves the iterations of outer and inner loops in a nested loop composition. This works exactly the same way as the merging of two streams in wSerially fashion works. The wSerially type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as WSerial.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = S.drain . wSerially $ do
        x <- S.fromFoldable [1,2]
        y <- S.fromFoldable [3,4]
        S.yieldM $ putStrLn $ show (x, y)
    
    (1,3)
    (2,3)
    (1,4)
    (2,4)
    

    Deep Speculative Nesting (Ahead)

    The Monad composition of Ahead type behaves just like Serial except that it can speculatively perform a bounded number of next iterations of a loop concurrently.

    The aheadly type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Ahead.

    import Streamly
    import qualified Streamly.Prelude as S
    
    comp = S.toList . aheadly $ do
        x <- S.fromFoldable [3,2,1]
        delay x >> return x
    
    main = comp >>= print
    
    ThreadId 40: Delay 1
    ThreadId 39: Delay 2
    ThreadId 38: Delay 3
    [3,2,1]
    

    This code finishes in 3 seconds, Serial would take 6 seconds. As we can see all the three iterations are concurrent and run in different threads, however, the results are returned in the serial order.

    Concurrency is demand driven, when multiple streams are composed using this style, the iterations are executed in a depth first manner just like Serial i.e. nested iterations are executed before we proceed to the next outer iteration. The only difference is that we may execute multiple future iterations concurrently and keep the results ready.

    Deep Asynchronous Nesting (Async)

    The Monad composition of Async type can perform the iterations of a loop concurrently. Concurrency is demand driven i.e. more concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer of the output stream. This works exactly the same way as the merging of two streams asyncly works. This is the concurrent analogue of Serial style monadic composition.

    The asyncly type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Async.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = S.drain . asyncly $ do
        x <- S.fromFoldable [3,2,1]
        delay x
    
    ThreadId 40: Delay 1
    ThreadId 39: Delay 2
    ThreadId 38: Delay 3
    

    As we can see the code after the fromFoldable statement is run three times, once for each value of x. All the three iterations are concurrent and run in different threads. The iteration with least delay finishes first. When compared to imperative programming, this can be viewed as a for loop with three concurrent iterations.

    Concurrency is demand driven just as in the case of async merging. When multiple streams are composed using this style, the iterations are triggered in a depth first manner just like Serial i.e. nested iterations are executed before we proceed to the next iteration at higher level. However, unlike Serial more than one iterations may be started concurrently based on the demand from the consumer of the stream.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = S.drain . asyncly $ do
        x <- S.fromFoldable [1,2]
        y <- S.fromFoldable [3,4]
        S.yieldM $ putStrLn $ show (x, y)
    
    (1,3)
    (1,4)
    (2,3)
    (2,4)
    

    Wide Asynchronous Nesting (WAsync)

    Just like Async the Monad composition of WAsync runs the iterations of a loop concurrently. The difference is in the nested loop behavior. The nested loops in this type are traversed and executed in a breadth first manner rather than the depth first manner of Async style. The loop nesting works exactly the same way as the merging of streams wAsyncly works. The wAsyncly type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as WAsync.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = S.drain . wAsyncly $ do
        x <- S.fromFoldable [1,2]
        y <- S.fromFoldable [3,4]
        S.yieldM $ putStrLn $ show (x, y)
    
    (1,3)
    (2,3)
    (1,4)
    (2,4)
    

    Parallel Asynchronous Nesting (Parallel)

    Just like Async or WAsync the Monad composition of Parallel runs the iterations of a loop concurrently. The difference is in the nested loop behavior. The streams at each nest level is run fully concurrently irrespective of the demand. The loop nesting works exactly the same way as the merging of streams parallely works. The parallely type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Parallel.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = S.drain . parallely $ do
        x <- S.fromFoldable [3,2,1]
        delay x
    
    ThreadId 40: Delay 1
    ThreadId 39: Delay 2
    ThreadId 38: Delay 3
    

    Exercise

    Streamly code is usually written in a way that is agnostic of the specific monadic composition type. We use a polymorphic type with a IsStream type class constraint. When running the stream we can choose the specific mode of composition. For example take a look at the following code.

    import Streamly
    import qualified Streamly.Prelude as S
    
    composed :: (IsStream t, Monad (t IO)) => t IO ()
    composed = do
        sz <- sizes
        cl <- colors
        sh <- shapes
        S.yieldM $ putStrLn $ show (sz, cl, sh)
    
        where
    
        sizes  = S.fromFoldable [1, 2, 3]
        colors = S.fromFoldable ["red", "green", "blue"]
        shapes = S.fromFoldable ["triangle", "square", "circle"]
    

    Now we can interpret this in whatever way we want:

    main = S.drain . serially  $ composed
    main = S.drain . wSerially $ composed
    main = S.drain . asyncly   $ composed
    main = S.drain . wAsyncly  $ composed
    main = S.drain . parallely $ composed
    

    As an exercise try to figure out the output of this code for each mode of composition.

    Applicative

    Applicative is precisely the same as the ap operation of Monad. For zipping applicatives separate types ZipSerial and ZipAsync are provided.

    The following example uses the Serial applicative, it runs all iterations serially and takes a total 17 seconds (1 + 3 + 4 + 2 + 3 + 4):

    import Streamly
    import qualified Streamly.Prelude as S
    import Control.Concurrent
    
    s1 = d 1 <> d 2
    s2 = d 3 <> d 4
    d n = delay n >> return n
    
    main = (S.toList . serially $ (,) <$> s1 <*> s2) >>= print
    
    ThreadId 36: Delay 1
    ThreadId 36: Delay 3
    ThreadId 36: Delay 4
    ThreadId 36: Delay 2
    ThreadId 36: Delay 3
    ThreadId 36: Delay 4
    [(1,3),(1,4),(2,3),(2,4)]
    

    Similarly WSerial applicative runs the iterations in an interleaved order but since it is serial it takes a total of 17 seconds:

    main = (S.toList . wSerially $ (,) <$> s1 <*> s2) >>= print
    
    ThreadId 36: Delay 1
    ThreadId 36: Delay 3
    ThreadId 36: Delay 2
    ThreadId 36: Delay 3
    ThreadId 36: Delay 4
    ThreadId 36: Delay 4
    [(1,3),(2,3),(1,4),(2,4)]
    

    Async can run the iterations concurrently and therefore takes a total of 6 seconds which is max (1, 2) + max (3, 4):

    main = (S.toList . asyncly $ (,) <$> s1 <*> s2) >>= print
    
    ThreadId 34: Delay 1
    ThreadId 36: Delay 2
    ThreadId 35: Delay 3
    ThreadId 36: Delay 3
    ThreadId 35: Delay 4
    ThreadId 36: Delay 4
    [(1,3),(2,3),(1,4),(2,4)]
    

    Similarly WAsync as well can run the iterations concurrently and therefore takes a total of 6 seconds (2 + 4):

    main = (S.toList . wAsyncly $ (,) <$> s1 <*> s2) >>= print
    
    ThreadId 34: Delay 1
    ThreadId 36: Delay 2
    ThreadId 35: Delay 3
    ThreadId 36: Delay 3
    ThreadId 35: Delay 4
    ThreadId 36: Delay 4
    [(1,3),(2,3),(1,4),(2,4)]
    

    Functor

    fmap transforms a stream by mapping a function on all elements of the stream. fmap behaves in the same way for all stream types, it is always serial.

    import Streamly
    import qualified Streamly.Prelude as S
    
    main = (S.toList $ fmap show $ S.fromFoldable [1..10]) >>= print
    

    Also see the mapM and sequence functions for mapping actions, in the Streamly.Prelude module.

    Zipping Streams

    Zipping is a special transformation where the corresponding elements of two streams are combined together using a zip function producing a new stream of outputs. Two different types are provided for serial and concurrent zipping. These types provide an applicative instance that can be used to lift functions to zip the argument streams. Also see the zipping functions in the Streamly.Prelude module.

    Serial Zipping

    The applicative instance of ZipSerial type zips streams serially. zipSerially type combinator can be used to switch to serial applicative zip composition:

    import Streamly
    import qualified Streamly.Prelude as S
    import Control.Concurrent
    
    d n = delay n >> return n
    s1 = serially $ d 1 <> d 2
    s2 = serially $ d 3 <> d 4
    
    main = (S.toList . zipSerially $ (,) <$> s1 <*> s2) >>= print
    

    This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since everything runs serially:

    ThreadId 29: Delay 1
    ThreadId 29: Delay 3
    ThreadId 29: Delay 2
    ThreadId 29: Delay 4
    [(1,3),(2,4)]
    

    Parallel Zipping

    The applicative instance of ZipAsync type zips streams concurrently. zipAsyncly type combinator can be used to switch to parallel applicative zip composition:

    import Streamly
    import qualified Streamly.Prelude as S
    import Control.Concurrent
    import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
    
    d n = delay n >> return n
    s1 = serially $ d 1 <> d 2
    s2 = serially $ d 3 <> d 4
    
    main = do
        hSetBuffering stdout LineBuffering
        (S.toList . zipAsyncly $ (,) <$> s1 <*> s2) >>= print
    

    This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 are produced concurrently, and 2 and 4 are produced concurrently:

    ThreadId 32: Delay 1
    ThreadId 32: Delay 2
    ThreadId 33: Delay 3
    ThreadId 33: Delay 4
    [(1,3),(2,4)]
    

    Monad transformers

    To represent streams in an arbitrary monad use the more general monad transformer types for example the monad transformer type corresponding to the Serial type is SerialT. SerialT m a represents a stream of values of type a in some underlying monad m. For example, SerialT IO Int is a stream of Int in IO monad. In fact, the type Serial is a synonym for SerialT IO.

    Similarly we have monad transformer types for other stream types as well viz. WSerialT, AsyncT, WAsyncT and ParallelT.

    To lift a value from an underlying monad in a monad transformer stack into a singleton stream use lift and to lift from an IO action use liftIO.

    > S.drain $ liftIO $ putStrLn "Hello world!"
    Hello world!
    > S.drain $ lift $ putStrLn "Hello world!"
    Hello world!
    

    Concurrent Programming

    When writing concurrent programs there are two distinct places where the programmer can control the concurrency. First, when composing a stream by merging multiple streams we can choose an appropriate sum style operators to combine them concurrently or serially. Second, when processing a stream in a monadic composition we can choose one of the monad composition types to choose the desired type of concurrency.

    In the following example the squares of x and y are computed concurrently using the async operation and the square roots of their sum are computed serially because of the streamly combinator. We can choose different combinators for the monadic processing and the stream generation, to control the concurrency. We can also use the asyncly combinator instead of explicitly folding with async.

    import Streamly
    import qualified Streamly.Prelude as S
    import Data.List (sum)
    
    main = do
        z <-   S.toList
             $ serially     -- Serial monadic processing (sqrt below)
             $ do
                 x2 <- forEachWith async [1..100] $ -- Concurrent "for" loop
                             \x -> return $ x * x  -- body of the loop
                 y2 <- forEachWith async [1..100] $
                             \y -> return $ y * y
                 return $ sqrt (x2 + y2)
        print $ sum z
    

    We can see how this directly maps to the imperative style OpenMP model, we use combinators and operators instead of the ugly pragmas.

    For more concurrent programming examples see, ListDir.hs, MergeSort.hs and SearchQuery.hs in the examples directory.

    Reactive Programming

    Reactive programming is nothing but concurrent streaming which is what streamly is all about. With streamly we can generate streams of events, merge streams that are generated concurrently and process events concurrently. We can do all this without any knowledge about the specifics of the implementation of concurrency. In the following example you will see that the code is just regular Haskell code without much streamly APIs used (active hyperlinks are the streamly APIs) and yet it is a reactive application.

    This application has two independent and concurrent sources of event streams, acidRain and userAction. acidRain continuously generates events that deteriorate the health of the character in the game. userAction can be "potion" or "quit". When the user types "potion" the health improves and the game continues.

    {-# LANGUAGE FlexibleContexts #-}
    
    import Streamly
    import Streamly.Prelude as S
    import Control.Monad (void, when)
    import Control.Monad.IO.Class (MonadIO(liftIO))
    import Control.Monad.State (MonadState, get, modify, runStateT, put)
    
    data Event = Quit | Harm Int | Heal Int deriving (Show)
    
    userAction :: MonadAsync m => SerialT m Event
    userAction = S.repeatM $ liftIO askUser
        where
        askUser = do
            command <- getLine
            case command of
                "potion" -> return (Heal 10)
                "harm"   -> return (Harm 10)
                "quit"   -> return Quit
                _        -> putStrLn "Type potion or harm or quit" >> askUser
    
    acidRain :: MonadAsync m => SerialT m Event
    acidRain = asyncly $ constRate 1 $ S.repeatM $ liftIO $ return $ Harm 1
    
    data Result = Check | Done
    
    runEvents :: (MonadAsync m, MonadState Int m) => SerialT m Result
    runEvents = do
        event <- userAction `parallel` acidRain
        case event of
            Harm n -> modify (\h -> h - n) >> return Check
            Heal n -> modify (\h -> h + n) >> return Check
            Quit -> return Done
    
    data Status = Alive | GameOver deriving Eq
    
    getStatus :: (MonadAsync m, MonadState Int m) => Result -> m Status
    getStatus result =
        case result of
            Done  -> liftIO $ putStrLn "You quit!" >> return GameOver
            Check -> do
                h <- get
                liftIO $ if (h <= 0)
                         then putStrLn "You die!" >> return GameOver
                         else putStrLn ("Health = " <> show h) >> return Alive
    
    main :: IO ()
    main = do
        putStrLn "Your health is deteriorating due to acid rain,\
                 \ type \"potion\" or \"quit\""
        let runGame = S.drainWhile (== Alive) $ S.mapM getStatus runEvents
        void $ runStateT runGame 60
    

    You can also find the source of this example in the examples directory as AcidRain.hs. It has been adapted from Gabriel's pipes-concurrency package. This is much simpler compared to the pipes version because of the builtin concurrency in streamly. You can also find a SDL based reactive programming example adapted from Yampa in CirclingSquare.hs.

    Writing Concurrent Programs

    When writing concurrent programs it is advised to not use the concurrent style stream combinators blindly at the top level. That might create too much concurrency where it is not even required, and can even degrade performance in some cases. In some cases it can also lead to surprising behavior because of some code that is supposed to be serial becoming concurrent. Please be aware that all concurrency capable APIs that you may have used under the scope of a concurrent stream combinator will become concurrent. For example if you have a repeatM somewhere in your program and you use parallely on top, the repeatM becomes fully parallel, resulting into an infinite parallel execution . Instead, use the Keep It Serial and Stupid principle, start with the default serial composition and enable concurrent combinators only when and where necessary. When you use a concurrent combinator you can use an explicit serially combinator to suppress any unnecessary concurrency under the scope of that combinator.

    Performance

    Streamly is highly optimized for performance, it is designed for serious high performing, concurrent and scalable applications. We have created the streaming-benchmarks package which is specifically and carefully designed to measure the performance of Haskell streaming libraries fairly and squarely in the right way. Streamly performs at par or even better than most streaming libraries for serial operations even though it needs to deal with the concurrency capability.

    Interoperation with Streaming Libraries

    We can use unfoldr and uncons to convert one streaming type to another.

    Interop with vector:

    import Streamly
    import qualified Streamly.Prelude as S
    import qualified Data.Vector.Fusion.Stream.Monadic as V
    
    -- | vector to streamly
    fromVector :: (IsStream t, Monad m) => V.Stream m a -> t m a
    fromVector = S.unfoldrM unconsV
        where
        unconsV v = do
            r <- V.null v
            if r
            then return Nothing
            else do
                h <- V.head v
                return $ Just (h, V.tail v)
    
    -- | streamly to vector
    toVector :: Monad m => SerialT m a -> V.Stream m a
    toVector = V.unfoldrM (S.uncons . adapt)
    
    main = do
        S.toList (fromVector (V.fromList [1..3]))   >>= print
        V.toList (toVector (S.fromFoldable [1..3])) >>= print
    

    Interop with pipes:

    import Streamly
    import qualified Streamly.Prelude as S
    import qualified Pipes as P
    import qualified Pipes.Prelude as P
    
    -- | pipes to streamly
    fromPipes :: (IsStream t, Monad m) => P.Producer a m r -> t m a
    fromPipes = S.unfoldrM unconsP
        where
        -- Adapt P.next to return a Maybe instead of Either
        unconsP p = P.next p >>= either (\_ -> return Nothing) (return . Just)
    
    -- | streamly to pipes
    toPipes :: Monad m => SerialT m a -> P.Producer a m ()
    toPipes = P.unfoldr unconsS
        where
        -- Adapt S.uncons to return an Either instead of Maybe
        unconsS s = S.uncons s >>= maybe (return $ Left ()) (return . Right)
    
    main = do
        S.toList (fromPipes (P.each [1..3])) >>= print
        P.toListM (toPipes (S.fromFoldable [1..3])) >>= print
    

    Interop with streaming:

    import Streamly
    import qualified Streamly.Prelude as S
    import qualified Streaming as SG
    import qualified Streaming.Prelude as SG
    
    -- | streaming to streamly
    fromStreaming :: (IsStream t, MonadAsync m) => SG.Stream (SG.Of a) m r -> t m a
    fromStreaming = S.unfoldrM SG.uncons
    
    -- | streamly to streaming
    toStreaming :: Monad m => SerialT m a -> SG.Stream (SG.Of a) m ()
    toStreaming = SG.unfoldr unconsS
        where
        -- Adapt S.uncons to return an Either instead of Maybe
        unconsS s = S.uncons s >>= maybe (return $ Left ()) (return . Right)
    
    main = do
        S.toList (fromStreaming (SG.each [1..3])) >>= print
        SG.toList (toStreaming (S.fromFoldable [1..3])) >>= print
    

    Interop with conduit:

    import Streamly
    import qualified Streamly.Prelude as S
    import qualified Data.Conduit as C
    import qualified Data.Conduit.List as C
    import qualified Data.Conduit.Combinators as C
    
    -- It seems there is no way out of a conduit as it does not provide an
    -- uncons or a tail function. We can convert streamly to conduit though.
    
    -- | streamly to conduit
    toConduit :: Monad m => SerialT m a -> C.ConduitT i a m ()
    toConduit s = C.unfoldM S.uncons s
    
    main = do
     C.runConduit (toConduit (S.fromFoldable [1..3]) C..| C.sinkList) >>= print
    

    Comparison with Existing Packages

    List transformers and logic programming monads also provide a product style composition similar to streamly, however streamly generalizes it with the time dimension; allowing streams to be composed in an asynchronous and concurrent fashion in many different ways. It also provides multiple alternative ways of composing streams e.g. serial, interleaved or concurrent.

    This seemingly simple addition of asynchronicity and concurrency to product style streaming composition unifies a number of disparate abstractions into one powerful, concise and elegant abstraction. A wide variety of programming problems can be solved elegantly with this abstraction. In particular, it unifies three major programming domains namely non-deterministic (logic) programming, concurrent programming and functional reactive programming. In other words, you can do everything with this one abstraction that you could do with the popular libraries listed under these categories in the list below.

    +-----------------+----------------+
    | Non-determinism | pipes          |
    |                 +----------------+
    |                 | list-t         |
    |                 +----------------+
    |                 | logict         |
    +-----------------+----------------+
    | Streaming       | vector         |
    |                 +----------------+
    |                 | streaming      |
    |                 +----------------+
    |                 | pipes          |
    |                 +----------------+
    |                 | conduit        |
    +-----------------+----------------+
    | Concurrency     | async          |
    |                 +----------------+
    |                 | transient      |
    +-----------------+----------------+
    | FRP             | Yampa          |
    |                 +----------------+
    |                 | dunai          |
    |                 +----------------+
    |                 | reflex         |
    +-----------------+----------------+
    

    Streamly is a list-transformer. It provides all the functionality provided by any of the list transformer and logic programming packages listed above. In addition, Streamly naturally integrates the concurrency dimension to the basic list transformer functionality.

    When it comes to streaming, in terms of the streaming API streamly is almost identical to the vector package. Streamly, vector and streaming packages all represent a stream as data and are therefore similar in the fundamental approach to streaming. The fundamental difference is that streamly adds concurrency support and the monad instance provides concurrent looping. Other streaming libraries like pipes, conduit and machines represent and compose stream processors rather than the stream data and therefore fall in another class of streaming libraries and have comparatively more complicated types.

    When it comes to concurrency, streamly can do everything that the async package can do and more. async provides applicative concurrency whereas streamly provides both applicative and monadic concurrency. The ZipAsync type behaves like the applicative instance of async. In comparison to transient streamly has a first class streaming interface and is a monad transformer that can be used universally in any Haskell monad transformer stack. Streamly was in fact originally inspired by the concurrency implementation in transient though it has no resemblance with that and takes a lazy pull approach versus transient's strict push approach.

    The non-determinism, concurrency and streaming combination make streamly a strong reactive programming library as well. Reactive programming is fundamentally stream of events that can be processed concurrently. The example in this tutorial as well as the CirclingSquare example from Yampa demonstrate the basic reactive capability of streamly. In core concepts streamly is strikingly similar to dunai. dunai was designed from a FRP perspective and streamly was originally designed from a concurrency perspective. However, both have similarity at the core.

    Where to go next?

    • Read the documentation of Streamly module
    • Read the documentation of Streamly.Prelude module
    • See the examples in the "examples" directory of the package
    • See the tests in the "test" directory of the package