{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Container
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Stream operations that require transformers or containers like Set or Map.

module Streamly.Internal.Data.Stream.StreamD.Container
    (
      nub

    -- * Joins for unconstrained types
    , joinLeftGeneric
    , joinOuterGeneric

    -- * Joins with Ord constraint
    , joinInner
    , joinLeft
    , joinOuter
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.State.Strict (get, put)
import Data.Function ((&))
import Data.Maybe (isJust)
import Streamly.Internal.Data.Stream.StreamD.Step (Step(..))
import Streamly.Internal.Data.Stream.StreamD.Type
    (Stream(..), mkCross, unCross)

import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Array.Generic as Array
import qualified Streamly.Internal.Data.Array.Mut.Type as MA
import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transform as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transformer as Stream

#include "DocTestDataStream.hs"

-- | The memory used is proportional to the number of unique elements in the
-- stream. If we want to limit the memory we can just use "take" to limit the
-- uniq elements in the stream.
{-# INLINE_NORMAL nub #-}
nub :: (Monad m, Ord a) => Stream m a -> Stream m a
nub :: forall (m :: * -> *) a.
(Monad m, Ord a) =>
Stream m a -> Stream m a
nub (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step (forall a. Set a
Set.empty, s
state1)

    where

    step :: State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step State StreamK m a
gst (Set a
set, s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                Yield a
x s
s ->
                    if forall a. Ord a => a -> Set a -> Bool
Set.member a
x Set a
set
                    then forall s a. s -> Step s a
Skip (Set a
set, s
s)
                    else forall s a. a -> s -> Step s a
Yield a
x (forall a. Ord a => a -> Set a -> Set a
Set.insert a
x Set a
set, s
s)
                Skip s
s -> forall s a. s -> Step s a
Skip (Set a
set, s
s)
                Step s a
Stop -> forall s a. Step s a
Stop

-- XXX Generate error if a duplicate insertion is attempted?
toMap ::  (Monad m, Ord k) => Stream m (k, v) -> m (Map.Map k v)
toMap :: forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap =
    let f :: Fold m (k, a) (Map k a)
f = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) forall k a. Map k a
Map.empty
     in forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall {a}. Fold m (k, a) (Map k a)
f

-- If the second stream is too big it can be partitioned based on hashes and
-- then we can process one parition at a time.
--
-- XXX An IntMap may be faster when the keys are Int.
-- XXX Use hashmap instead of map?
--
-- | Like 'joinInner' but uses a 'Map' for efficiency.
--
-- If the input streams have duplicate keys, the behavior is undefined.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinInner #-}
joinInner :: (Monad m, Ord k) =>
    Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner Stream m (k, a)
s1 Stream m (k, b)
s2 =
    forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (forall {a} {c} {b}. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) Stream m (k, a)
s1

    where

    joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
        case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
            Just c
b -> forall a. a -> Maybe a
Just (a
k, b
a, c
b)
            Maybe c
Nothing -> forall a. Maybe a
Nothing

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array or a seek capable
-- stream then we could use binary search if we have an Ord instance or
-- Ordering returning function. The time complexity would then become (m x log
-- n).

-- XXX Check performance of StreamD vs StreamK

-- | Like 'joinInner' but emit @(a, Just b)@, and additionally, for those @a@'s
-- that are not equal to any @b@ emit @(a, Nothing)@.
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it in an 'Data.Array.Array'
-- before calling this function. Caching may also improve performance if the
-- stream is expensive to evaluate.
--
-- >>> joinRightGeneric eq = flip (Stream.joinLeftGeneric eq)
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE joinLeftGeneric #-}
joinLeftGeneric :: Monad m =>
    (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross forall a b. (a -> b) -> a -> b
$ do
    a
a <- forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
    -- XXX should we use StreamD monad here?
    -- XXX Is there a better way to perform some action at the end of a loop
    -- iteration?
    forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
    let final :: Stream (StateT Bool m) (Maybe a)
final = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
            Bool
r <- forall (m :: * -> *) s. Monad m => StateT s m s
get
            if Bool
r
            then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
            else forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure forall a. Maybe a
Nothing)
    Maybe b
b <- forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
s2) forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` forall {a}. Stream (StateT Bool m) (Maybe a)
final)
    case Maybe b
b of
        Just b
b1 ->
            if a
a a -> b -> Bool
`eq` b
b1
            then do
                forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
                forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. a -> Maybe a
Just b
b1)
            else forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
        Maybe b
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Maybe a
Nothing)

-- XXX rename to joinLeftOrd?

-- | A more efficient 'joinLeft' using a hashmap for efficiency.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinLeft #-}
joinLeft :: (Ord k, Monad m) =>
    Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft :: forall k (m :: * -> *) a b.
(Ord k, Monad m) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft Stream m (k, a)
s1 Stream m (k, b)
s2 =
    forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall {a} {a} {b}. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) Stream m (k, a)
s1

            where

            joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
                case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                    Just a
b -> (a
k, b
a, forall a. a -> Maybe a
Just a
b)
                    Maybe a
Nothing -> (a
k, b
a, forall a. Maybe a
Nothing)

-- XXX We can do this concurrently.

-- XXX Check performance of StreamD vs StreamK

-- | Like 'joinLeft' but emits a @(Just a, Just b)@. Like 'joinLeft', for those
-- @a@'s that are not equal to any @b@ emit @(Just a, Nothing)@, but
-- additionally, for those @b@'s that are not equal to any @a@ emit @(Nothing,
-- Just b)@.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE joinOuterGeneric #-}
joinOuterGeneric :: MonadIO m =>
       (a -> b -> Bool)
    -> Stream m a
    -> Stream m b
    -> Stream m (Maybe a, Maybe b)
joinOuterGeneric :: forall (m :: * -> *) a b.
MonadIO m =>
(a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
joinOuterGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s =
    forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
        Array b
inputArr <- forall (m :: * -> *) a. MonadIO m => Stream m a -> m (Array a)
Array.fromStream Stream m b
s
        let len :: Int
len = forall a. Array a -> Int
Array.length Array b
inputArr
        MutArray Bool
foundArr <-
            forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold
            (forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (MutArray a)
MA.writeN Int
len)
            (forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList (forall a. Int -> a -> [a]
Prelude.replicate Int
len Bool
False))
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` forall {m :: * -> *} {a} {a}.
MonadIO m =>
Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array b
inputArr MutArray Bool
foundArr

    where

    leftOver :: Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array a
inputArr MutArray Bool
foundArr =
            let stream1 :: Stream m a
stream1 = forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array a
inputArr
                stream2 :: Stream m Bool
stream2 = forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Unfold m (MutArray a) a
MA.reader MutArray Bool
foundArr
            in forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter
                    forall a. Maybe a -> Bool
isJust
                    ( forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWith (\a
x Bool
y ->
                        if Bool
y
                        then forall a. Maybe a
Nothing
                        else forall a. a -> Maybe a
Just (forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
x)
                        ) Stream m a
stream1 Stream m Bool
stream2
                    ) forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes

    evalState :: CrossStream (StateT Bool m) a -> Stream m a
evalState = forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross

    go :: Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr = forall {a}. CrossStream (StateT Bool m) a -> Stream m a
evalState forall a b. (a -> b) -> a -> b
$ do
        a
a <- forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
        -- XXX should we use StreamD monad here?
        -- XXX Is there a better way to perform some action at the end of a loop
        -- iteration?
        forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
        let final :: Stream (StateT Bool m) (Maybe a)
final = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
                Bool
r <- forall (m :: * -> *) s. Monad m => StateT s m s
get
                if Bool
r
                then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
                else forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure forall a. Maybe a
Nothing)
        (Int
i, Maybe b
b) <-
            let stream :: Stream m b
stream = forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array b
inputArr
             in forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross
                (forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
Stream.indexed forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
stream) forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` forall {a}. Stream (StateT Bool m) (Maybe a)
final)

        case Maybe b
b of
            Just b
b1 ->
                if a
a a -> b -> Bool
`eq` b
b1
                then do
                    forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
                    forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> a -> m ()
MA.putIndex Int
i MutArray Bool
foundArr Bool
True
                    forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just b
b1)
                else forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
            Maybe b
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)

-- Put the b's that have been paired, in another hash or mutate the hash to set
-- a flag. At the end go through @Stream m b@ and find those that are not in that
-- hash to return (Nothing, b).

-- | Like 'joinOuter' but uses a 'Map' for efficiency.
--
-- Space: O(m + n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinOuter #-}
joinOuter ::
    (Ord k, MonadIO m) =>
    Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter :: forall k (m :: * -> *) a b.
(Ord k, MonadIO m) =>
Stream m (k, a)
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter Stream m (k, a)
s1 Stream m (k, b)
s2 =
    forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
        Map k a
km1 <- forall {a}. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, a)
s1
        Map k b
km2 <- forall {a}. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, b)
s2

        -- XXX Not sure if toList/fromList would fuse optimally. We may have to
        -- create a fused Map.toStream function.
        let res1 :: Stream m (k, Maybe a, Maybe b)
res1 = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2)
                        forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
                    where
                    joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
                        case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
b -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just a
b)
                            Maybe a
Nothing -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)

        -- XXX We can take advantage of the lookups in the first pass above to
        -- reduce the number of lookups in this pass. If we keep mutable cells
        -- in the second Map, we can flag it in the first pass and not do any
        -- lookup in the second pass if it is flagged.
        let res2 :: Stream m (k, Maybe a, Maybe b)
res2 = forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (forall {a} {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1)
                        forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
                    where
                    joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
                        case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
_ -> forall a. Maybe a
Nothing
                            Maybe a
Nothing -> forall a. a -> Maybe a
Just (a
k, forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
b)

        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
Stream.append Stream m (k, Maybe a, Maybe b)
res1 forall {a}. Stream m (k, Maybe a, Maybe b)
res2

        where

        -- XXX Generate error if a duplicate insertion is attempted?
        kvFold :: Stream m (k, a) -> m (Map k a)
kvFold =
            let f :: Fold m (k, a) (Map k a)
f = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) forall k a. Map k a
Map.empty
             in forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall {a}. Fold m (k, a) (Map k a)
f