{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BangPatterns #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.Streamly
(
streamlyEngine
, streamlyEngineM
, concurrentStreamlyEngine
, toStreamlyFold
, toStreamlyFoldM
, resultToList
, concatStream
, concatStreamFold
, concatStreamFoldM
, concatConcurrentStreamFold
, groupByHashableKey
, groupByOrderedKey
, groupByHashableKeyST
, groupByDiscriminatedKey
, SerialT
, WSerialT
, AheadT
, AsyncT
, WAsyncT
, ParallelT
, MonadAsync
, IsStream
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import Control.Arrow ( second )
import qualified Control.Foldl as FL
import Control.Monad.ST as ST
import qualified Data.Discrimination.Grouping as DG
import qualified Data.Foldable as F
import Data.Functor.Identity ( Identity(runIdentity) )
import Data.Hashable ( Hashable )
import qualified Data.HashMap.Strict as HMS
import qualified Data.HashTable.Class as HT
import qualified Data.HashTable.ST.Cuckoo as HTC
import qualified Data.List.NonEmpty as LNE
import qualified Data.Maybe as Maybe
import qualified Data.Map.Strict as MS
import qualified Data.Sequence as Seq
import qualified Streamly.Prelude as S
import qualified Streamly as S
import qualified Streamly.Internal.Data.Fold as SF
import Streamly ( SerialT
, WSerialT
, AheadT
, AsyncT
, WAsyncT
, ParallelT
, MonadAsync
, IsStream
)
toStreamlyFoldM :: FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM :: FoldM m a b -> Fold m a b
toStreamlyFoldM (FL.FoldM x -> a -> m x
step m x
start x -> m b
done) = (x -> a -> m x) -> m x -> (x -> m b) -> Fold m a b
forall s a (m :: * -> *) b.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
SF.mkFold x -> a -> m x
step m x
start x -> m b
done
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold :: Fold a b -> Fold m a b
toStreamlyFold (FL.Fold x -> a -> x
step x
start x -> b
done) = (x -> a -> x) -> x -> (x -> b) -> Fold m a b
forall (m :: * -> *) s a b.
Monad m =>
(s -> a -> s) -> s -> (s -> b) -> Fold m a b
SF.mkPure x -> a -> x
step x
start x -> b
done
unpackStream :: S.IsStream t => MRC.Unpack x y -> t Identity x -> t Identity y
unpackStream :: Unpack x y -> t Identity x -> t Identity y
unpackStream (MRC.Filter x -> Bool
t) = (x -> Bool) -> t Identity x -> t Identity x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackStream (MRC.Unpack x -> g y
f) = (x -> t Identity y) -> t Identity x -> t Identity y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (g y -> t Identity y
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable (g y -> t Identity y) -> (x -> g y) -> x -> t Identity y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackStream #-}
unpackStreamM :: (S.IsStream t, Monad m) => MRC.UnpackM m x y -> t m x -> t m y
unpackStreamM :: UnpackM m x y -> t m x -> t m y
unpackStreamM (MRC.FilterM x -> m Bool
t) = (x -> m Bool) -> t m x -> t m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
S.filterM x -> m Bool
t
unpackStreamM (MRC.UnpackM x -> m (g y)
f) = (x -> m (t m y)) -> t m x -> t m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
S.concatMapM ((g y -> t m y) -> m (g y) -> m (t m y)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g y -> t m y
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable (m (g y) -> m (t m y)) -> (x -> m (g y)) -> x -> m (t m y)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (g y)
f)
{-# INLINABLE unpackStreamM #-}
resultToList :: (Monad m, S.IsStream t) => t m a -> m [a]
resultToList :: t m a -> m [a]
resultToList = SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList (SerialT m a -> m [a]) -> (t m a -> SerialT m a) -> t m a -> m [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt
concatStream :: (Monad m, Monoid a) => S.SerialT m a -> m a
concatStream :: SerialT m a -> m a
concatStream = (a -> a -> a) -> a -> SerialT m a -> m a
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
S.foldl' a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>) a
forall a. Monoid a => a
mempty
concatStreamFold :: Monoid b => FL.Fold a (S.SerialT Identity b) -> FL.Fold a b
concatStreamFold :: Fold a (SerialT Identity b) -> Fold a b
concatStreamFold = (SerialT Identity b -> b)
-> Fold a (SerialT Identity b) -> Fold a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity b -> b
forall a. Identity a -> a
runIdentity (Identity b -> b)
-> (SerialT Identity b -> Identity b) -> SerialT Identity b -> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity b -> Identity b
forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream)
concatStreamFoldM
:: (Monad m, Monoid b, S.IsStream t) => FL.FoldM m a (t m b) -> FL.FoldM m a b
concatStreamFoldM :: FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM = (t m b -> m b) -> FoldM m a (t m b) -> FoldM m a b
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM (SerialT m b -> m b
forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream (SerialT m b -> m b) -> (t m b -> SerialT m b) -> t m b -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m b -> SerialT m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt)
concatConcurrentStreamFold
:: (Monad m, Monoid b, S.IsStream t) => FL.Fold a (t m b) -> FL.FoldM m a b
concatConcurrentStreamFold :: Fold a (t m b) -> FoldM m a b
concatConcurrentStreamFold = FoldM m a (t m b) -> FoldM m a b
forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM (FoldM m a (t m b) -> FoldM m a b)
-> (Fold a (t m b) -> FoldM m a (t m b))
-> Fold a (t m b)
-> FoldM m a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fold a (t m b) -> FoldM m a (t m b)
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize
streamlyEngine
:: (Foldable g, Functor g)
=> (forall z . S.SerialT Identity (k, z) -> S.SerialT Identity (k, g z))
-> MRE.MapReduceFold y k c (SerialT Identity) x d
streamlyEngine :: (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = (SerialT Identity x -> x -> SerialT Identity x)
-> SerialT Identity x
-> (SerialT Identity x -> SerialT Identity d)
-> Fold x (SerialT Identity d)
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
((x -> SerialT Identity x -> SerialT Identity x)
-> SerialT Identity x -> x -> SerialT Identity x
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> SerialT Identity x -> SerialT Identity x
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
SerialT Identity x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
( ((k, g c) -> d) -> SerialT Identity (k, g c) -> SerialT Identity d
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map (\(k
k, g c
lc) -> Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
(SerialT Identity (k, g c) -> SerialT Identity d)
-> (SerialT Identity x -> SerialT Identity (k, g c))
-> SerialT Identity x
-> SerialT Identity d
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity (k, c) -> SerialT Identity (k, g c)
forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey
(SerialT Identity (k, c) -> SerialT Identity (k, g c))
-> (SerialT Identity x -> SerialT Identity (k, c))
-> SerialT Identity x
-> SerialT Identity (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> (k, c)) -> SerialT Identity y -> SerialT Identity (k, c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map y -> (k, c)
a
(SerialT Identity y -> SerialT Identity (k, c))
-> (SerialT Identity x -> SerialT Identity y)
-> SerialT Identity x
-> SerialT Identity (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y -> SerialT Identity x -> SerialT Identity y
forall (t :: (* -> *) -> * -> *) x y.
IsStream t =>
Unpack x y -> t Identity x -> t Identity y
unpackStream Unpack x y
u
)
{-# INLINABLE streamlyEngine #-}
unpackConcurrently
:: (S.MonadAsync m, S.IsStream t) => MRC.Unpack x y -> t m x -> t m y
unpackConcurrently :: Unpack x y -> t m x -> t m y
unpackConcurrently (MRC.Filter x -> Bool
t) = (x -> Bool) -> t m x -> t m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackConcurrently (MRC.Unpack x -> g y
f) = (x -> t m y) -> t m x -> t m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (g y -> t m y
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable (g y -> t m y) -> (x -> g y) -> x -> t m y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackConcurrently #-}
concurrentStreamlyEngine
:: forall tIn tOut m g y k c x d
. (S.IsStream tIn, S.IsStream tOut, S.MonadAsync m, Foldable g, Functor g)
=> (forall z . S.SerialT m (k, z) -> S.SerialT m (k, g z))
-> MRE.MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine :: (forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = (tIn m x -> x -> tIn m x)
-> tIn m x -> (tIn m x -> tOut m d) -> Fold x (tOut m d)
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
(\tIn m x
s x
a' -> (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
a') m x -> tIn m x -> tIn m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
`S.consM` tIn m x
s)
tIn m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
( ((k, g c) -> m d) -> tOut m (k, g c) -> tOut m d
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> d -> m d
forall (m :: * -> *) a. Monad m => a -> m a
return (d -> m d) -> d -> m d
forall a b. (a -> b) -> a -> b
$ Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
(tOut m (k, g c) -> tOut m d)
-> (tIn m x -> tOut m (k, g c)) -> tIn m x -> tOut m d
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(IsStream SerialT, IsStream tOut) =>
SerialT m a -> tOut m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @SerialT @tOut
(SerialT m (k, g c) -> tOut m (k, g c))
-> (tIn m x -> SerialT m (k, g c)) -> tIn m x -> tOut m (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m (k, c) -> SerialT m (k, g c)
forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey
(SerialT m (k, c) -> SerialT m (k, g c))
-> (tIn m x -> SerialT m (k, c)) -> tIn m x -> SerialT m (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(IsStream tIn, IsStream SerialT) =>
tIn m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @tIn @SerialT
(tIn m (k, c) -> SerialT m (k, c))
-> (tIn m x -> tIn m (k, c)) -> tIn m x -> SerialT m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> m (k, c)) -> tIn m y -> tIn m (k, c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM ((k, c) -> m (k, c)
forall (m :: * -> *) a. Monad m => a -> m a
return ((k, c) -> m (k, c)) -> (y -> (k, c)) -> y -> m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> (k, c)
a)
(tIn m y -> tIn m (k, c))
-> (tIn m x -> tIn m y) -> tIn m x -> tIn m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (tIn m x -> tIn m y) -> tIn m x -> tIn m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(S.|$) (Unpack x y -> tIn m x -> tIn m y
forall (m :: * -> *) (t :: (* -> *) -> * -> *) x y.
(MonadAsync m, IsStream t) =>
Unpack x y -> t m x -> t m y
unpackConcurrently Unpack x y
u)
)
{-# INLINABLE concurrentStreamlyEngine #-}
streamlyEngineM
:: (S.IsStream t, Monad m, S.MonadAsync m, Traversable g)
=> (forall z . SerialT m (k, z) -> SerialT m (k, g z))
-> MRE.MapReduceFoldM m y k c (t m) x d
streamlyEngineM :: (forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFoldM m y k c (t m) x d
streamlyEngineM forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r =
Fold x (t m d) -> FoldM m x (t m d)
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize
(Fold x (t m d) -> FoldM m x (t m d))
-> Fold x (t m d) -> FoldM m x (t m d)
forall a b. (a -> b) -> a -> b
$ (SerialT m d -> t m d) -> Fold x (SerialT m d) -> Fold x (t m d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap SerialT m d -> t m d
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt
(Fold x (SerialT m d) -> Fold x (t m d))
-> Fold x (SerialT m d) -> Fold x (t m d)
forall a b. (a -> b) -> a -> b
$ (SerialT m x -> x -> SerialT m x)
-> SerialT m x
-> (SerialT m x -> SerialT m d)
-> Fold x (SerialT m d)
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
((x -> SerialT m x -> SerialT m x)
-> SerialT m x -> x -> SerialT m x
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> SerialT m x -> SerialT m x
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
SerialT m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
( ((k, g c) -> m d) -> SerialT m (k, g c) -> SerialT m d
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> ReduceM m k c d -> k -> g c -> m d
forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r k
k g c
lc)
(SerialT m (k, g c) -> SerialT m d)
-> (SerialT m x -> SerialT m (k, g c))
-> SerialT m x
-> SerialT m d
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m (k, c) -> SerialT m (k, g c)
forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey
(SerialT m (k, c) -> SerialT m (k, g c))
-> (SerialT m x -> SerialT m (k, c))
-> SerialT m x
-> SerialT m (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> m (k, c)) -> SerialT m y -> SerialT m (k, c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM y -> m (k, c)
a
(SerialT m y -> SerialT m (k, c))
-> (SerialT m x -> SerialT m y) -> SerialT m x -> SerialT m (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnpackM m x y -> SerialT m x -> SerialT m y
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x y.
(IsStream t, Monad m) =>
UnpackM m x y -> t m x -> t m y
unpackStreamM UnpackM m x y
u
)
{-# INLINABLE streamlyEngineM #-}
groupByHashableKey
:: (Monad m, Hashable k, Eq k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByHashableKey :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKey SerialT m (k, c)
s = do
[(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.yieldM (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
let hm :: HashMap k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> HashMap k (Seq c)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> HashMap k (Seq c))
-> [(k, Seq c)] -> HashMap k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second ((c -> Seq c) -> (k, c) -> (k, Seq c))
-> (c -> Seq c) -> (k, c) -> (k, Seq c)
forall a b. (a -> b) -> a -> b
$ c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
(k -> Seq c -> SerialT m (k, Seq c) -> SerialT m (k, Seq c))
-> SerialT m (k, Seq c)
-> HashMap k (Seq c)
-> SerialT m (k, Seq c)
forall k v a. (k -> v -> a -> a) -> a -> HashMap k v -> a
HMS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> (k, Seq c) -> SerialT m (k, Seq c) -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}
groupByOrderedKey
:: (Monad m, Ord k) => S.SerialT m (k, c) -> S.SerialT m (k, Seq.Seq c)
groupByOrderedKey :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByOrderedKey SerialT m (k, c)
s = do
[(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.yieldM (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
let hm :: Map k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> Map k (Seq c)
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> Map k (Seq c)) -> [(k, Seq c)] -> Map k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second ((c -> Seq c) -> (k, c) -> (k, Seq c))
-> (c -> Seq c) -> (k, c) -> (k, Seq c)
forall a b. (a -> b) -> a -> b
$ c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
(k -> Seq c -> SerialT m (k, Seq c) -> SerialT m (k, Seq c))
-> SerialT m (k, Seq c) -> Map k (Seq c) -> SerialT m (k, Seq c)
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
MS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> (k, Seq c) -> SerialT m (k, Seq c) -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}
groupByHashableKeyST
:: (Monad m, Hashable k, Eq k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByHashableKeyST :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKeyST SerialT m (k, c)
st = do
[(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.yieldM (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
st)
(forall s. ST s (SerialT m (k, Seq c))) -> SerialT m (k, Seq c)
forall a. (forall s. ST s a) -> a
ST.runST ((forall s. ST s (SerialT m (k, Seq c))) -> SerialT m (k, Seq c))
-> (forall s. ST s (SerialT m (k, Seq c))) -> SerialT m (k, Seq c)
forall a b. (a -> b) -> a -> b
$ do
HashTable s k (Seq c)
hm <- ((Seq c -> Seq c -> Seq c)
-> [(k, Seq c)] -> ST s (HashTable s k (Seq c))
forall (h :: * -> * -> * -> *) k v s.
(HashTable h, Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> ST s (h s k v)
MRE.fromListWithHT @HTC.HashTable) Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>)
([(k, Seq c)] -> ST s (HashTable s k (Seq c)))
-> [(k, Seq c)] -> ST s (HashTable s k (Seq c))
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
(SerialT m (k, Seq c) -> (k, Seq c) -> ST s (SerialT m (k, Seq c)))
-> SerialT m (k, Seq c)
-> HashTable s k (Seq c)
-> ST s (SerialT m (k, Seq c))
forall (h :: * -> * -> * -> *) a k v s.
HashTable h =>
(a -> (k, v) -> ST s a) -> a -> h s k v -> ST s a
HT.foldM (\SerialT m (k, Seq c)
s' (k
k, Seq c
sc) -> SerialT m (k, Seq c) -> ST s (SerialT m (k, Seq c))
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT m (k, Seq c) -> ST s (SerialT m (k, Seq c)))
-> SerialT m (k, Seq c) -> ST s (SerialT m (k, Seq c))
forall a b. (a -> b) -> a -> b
$ (k, Seq c) -> SerialT m (k, Seq c) -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
sc) SerialT m (k, Seq c)
s') SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashTable s k (Seq c)
hm
{-# INLINABLE groupByHashableKeyST #-}
groupByDiscriminatedKey
:: (Monad m, DG.Grouping k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByDiscriminatedKey :: SerialT m (k, c) -> SerialT m (k, Seq c)
groupByDiscriminatedKey SerialT m (k, c)
s = do
[(k, c)]
lkc <- m [(k, c)] -> SerialT m [(k, c)]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.yieldM (SerialT m (k, c) -> m [(k, c)]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
let g :: LNE.NonEmpty (k, c) -> (k, Seq.Seq c)
g :: NonEmpty (k, c) -> (k, Seq c)
g NonEmpty (k, c)
x = let k :: k
k = (k, c) -> k
forall a b. (a, b) -> a
fst (NonEmpty (k, c) -> (k, c)
forall a. NonEmpty a -> a
LNE.head NonEmpty (k, c)
x) in (k
k, NonEmpty (Seq c) -> Seq c
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold (NonEmpty (Seq c) -> Seq c) -> NonEmpty (Seq c) -> Seq c
forall a b. (a -> b) -> a -> b
$ ((k, c) -> Seq c) -> NonEmpty (k, c) -> NonEmpty (Seq c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (c -> Seq c
forall a. a -> Seq a
Seq.singleton (c -> Seq c) -> ((k, c) -> c) -> (k, c) -> Seq c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, c) -> c
forall a b. (a, b) -> b
snd) NonEmpty (k, c)
x)
[(k, Seq c)] -> SerialT m (k, Seq c)
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable ([(k, Seq c)] -> SerialT m (k, Seq c))
-> [(k, Seq c)] -> SerialT m (k, Seq c)
forall a b. (a -> b) -> a -> b
$ [Maybe (k, Seq c)] -> [(k, Seq c)]
forall a. [Maybe a] -> [a]
Maybe.catMaybes ([Maybe (k, Seq c)] -> [(k, Seq c)])
-> ([[(k, c)]] -> [Maybe (k, Seq c)]) -> [[(k, c)]] -> [(k, Seq c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(k, c)] -> Maybe (k, Seq c)) -> [[(k, c)]] -> [Maybe (k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((NonEmpty (k, c) -> (k, Seq c))
-> Maybe (NonEmpty (k, c)) -> Maybe (k, Seq c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap NonEmpty (k, c) -> (k, Seq c)
forall k c. NonEmpty (k, c) -> (k, Seq c)
g (Maybe (NonEmpty (k, c)) -> Maybe (k, Seq c))
-> ([(k, c)] -> Maybe (NonEmpty (k, c)))
-> [(k, c)]
-> Maybe (k, Seq c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(k, c)] -> Maybe (NonEmpty (k, c))
forall a. [a] -> Maybe (NonEmpty a)
LNE.nonEmpty) ([[(k, c)]] -> [(k, Seq c)]) -> [[(k, c)]] -> [(k, Seq c)]
forall a b. (a -> b) -> a -> b
$ ((k, c) -> k) -> [(k, c)] -> [[(k, c)]]
forall b a. Grouping b => (a -> b) -> [a] -> [[a]]
DG.groupWith (k, c) -> k
forall a b. (a, b) -> a
fst [(k, c)]
lkc
{-# INLINABLE groupByDiscriminatedKey #-}