{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.Streaming
(
StreamResult(..)
, streamingEngine
, streamingEngineM
, resultToList
, concatStream
, concatStreamFold
, concatStreamFoldM
, groupByHashableKey
, groupByOrderedKey
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import qualified Control.Foldl as FL
import Data.Functor.Identity ( Identity )
import Data.Hashable ( Hashable )
import qualified Data.HashMap.Strict as HMS
import qualified Data.Map.Strict as MS
import qualified Data.Sequence as Seq
import qualified Streaming.Prelude as S
import qualified Streaming as S
import Streaming ( Stream
, Of
)
import Control.Arrow ( second )
unpackStream
:: MRC.Unpack x y -> S.Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream :: Unpack x y -> Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream (MRC.Filter x -> Bool
t) = (x -> Bool) -> Stream (Of x) Identity r -> Stream (Of x) Identity r
forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r
S.filter x -> Bool
t
unpackStream (MRC.Unpack x -> g y
f) = Stream (Of (g y)) Identity r -> Stream (Of y) Identity r
forall (m :: * -> *) (f :: * -> *) a r.
(Monad m, Foldable f) =>
Stream (Of (f a)) m r -> Stream (Of a) m r
S.concat (Stream (Of (g y)) Identity r -> Stream (Of y) Identity r)
-> (Stream (Of x) Identity r -> Stream (Of (g y)) Identity r)
-> Stream (Of x) Identity r
-> Stream (Of y) Identity r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> g y)
-> Stream (Of x) Identity r -> Stream (Of (g y)) Identity r
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map x -> g y
f
{-# INLINABLE unpackStream #-}
unpackStreamM
:: Monad m => MRC.UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM :: UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM (MRC.FilterM x -> m Bool
t) = (x -> m Bool) -> Stream (Of x) m r -> Stream (Of x) m r
forall (m :: * -> *) a r.
Monad m =>
(a -> m Bool) -> Stream (Of a) m r -> Stream (Of a) m r
S.filterM x -> m Bool
t
unpackStreamM (MRC.UnpackM x -> m (g y)
f) = Stream (Of (g y)) m r -> Stream (Of y) m r
forall (m :: * -> *) (f :: * -> *) a r.
(Monad m, Foldable f) =>
Stream (Of (f a)) m r -> Stream (Of a) m r
S.concat (Stream (Of (g y)) m r -> Stream (Of y) m r)
-> (Stream (Of x) m r -> Stream (Of (g y)) m r)
-> Stream (Of x) m r
-> Stream (Of y) m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> m (g y)) -> Stream (Of x) m r -> Stream (Of (g y)) m r
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM x -> m (g y)
f
{-# INLINABLE unpackStreamM #-}
groupByHashableKey
:: forall m k c r
. (Monad m, Hashable k, Eq k)
=> Stream (Of (k, c)) m r
-> Stream (Of (k, Seq.Seq c)) m r
groupByHashableKey :: Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
groupByHashableKey Stream (Of (k, c)) m r
s = m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect (m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r)
-> m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall a b. (a -> b) -> a -> b
$ do
([(k, c)]
lkc S.:> r
r) <- Stream (Of (k, c)) m r -> m (Of [(k, c)] r)
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of [a] r)
S.toList Stream (Of (k, c)) m r
s
let hm :: HashMap k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> HashMap k (Seq c)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> HashMap k (Seq c))
-> [(k, Seq c)] -> HashMap k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r))
-> Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall a b. (a -> b) -> a -> b
$ (k
-> Seq c
-> Stream (Of (k, Seq c)) m r
-> Stream (Of (k, Seq c)) m r)
-> Stream (Of (k, Seq c)) m r
-> HashMap k (Seq c)
-> Stream (Of (k, Seq c)) m r
forall k v a. (k -> v -> a -> a) -> a -> HashMap k v -> a
HMS.foldrWithKey (\k
k Seq c
lc Stream (Of (k, Seq c)) m r
s' -> (k, Seq c)
-> Stream (Of (k, Seq c)) m r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons (k
k, Seq c
lc) Stream (Of (k, Seq c)) m r
s') (r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r) HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}
groupByOrderedKey
:: forall m k c r
. (Monad m, Ord k)
=> Stream (Of (k, c)) m r
-> Stream (Of (k, Seq.Seq c)) m r
groupByOrderedKey :: Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
groupByOrderedKey Stream (Of (k, c)) m r
s = m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect (m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r)
-> m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall a b. (a -> b) -> a -> b
$ do
([(k, c)]
lkc S.:> r
r) <- Stream (Of (k, c)) m r -> m (Of [(k, c)] r)
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of [a] r)
S.toList Stream (Of (k, c)) m r
s
let hm :: Map k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> Map k (Seq c)
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> Map k (Seq c)) -> [(k, Seq c)] -> Map k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r))
-> Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall a b. (a -> b) -> a -> b
$ (k
-> Seq c
-> Stream (Of (k, Seq c)) m r
-> Stream (Of (k, Seq c)) m r)
-> Stream (Of (k, Seq c)) m r
-> Map k (Seq c)
-> Stream (Of (k, Seq c)) m r
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
MS.foldrWithKey (\k
k Seq c
lc Stream (Of (k, Seq c)) m r
s' -> (k, Seq c)
-> Stream (Of (k, Seq c)) m r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons (k
k, Seq c
lc) Stream (Of (k, Seq c)) m r
s') (r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r) Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}
newtype StreamResult m d = StreamResult { StreamResult m d -> Stream (Of d) m ()
unRes :: Stream (Of d) m () }
resultToList :: Monad m => StreamResult m d -> m [d]
resultToList :: StreamResult m d -> m [d]
resultToList = Stream (Of d) m () -> m [d]
forall (m :: * -> *) a r. Monad m => Stream (Of a) m r -> m [a]
S.toList_ (Stream (Of d) m () -> m [d])
-> (StreamResult m d -> Stream (Of d) m ())
-> StreamResult m d
-> m [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamResult m d -> Stream (Of d) m ()
forall (m :: * -> *) d. StreamResult m d -> Stream (Of d) m ()
unRes
concatStreaming :: (Monad m, Monoid a) => Stream (Of a) m () -> m a
concatStreaming :: Stream (Of a) m () -> m a
concatStreaming = (Of a (m a) -> m a) -> Stream (Of a) m a -> m a
forall (f :: * -> *) (m :: * -> *) a.
(Functor f, Monad m) =>
(f (m a) -> m a) -> Stream f m a -> m a
S.iterT Of a (m a) -> m a
forall (f :: * -> *) b.
(Functor f, Semigroup b) =>
Of b (f b) -> f b
g (Stream (Of a) m a -> m a)
-> (Stream (Of a) m () -> Stream (Of a) m a)
-> Stream (Of a) m ()
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (() -> a) -> Stream (Of a) m () -> Stream (Of a) m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> () -> a
forall a b. a -> b -> a
const a
forall a. Monoid a => a
mempty)
where g :: Of b (f b) -> f b
g (b
a S.:> f b
ma) = (b -> b) -> f b -> f b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b
a b -> b -> b
forall a. Semigroup a => a -> a -> a
<>) f b
ma
concatStream :: (Monad m, Monoid a) => StreamResult m a -> m a
concatStream :: StreamResult m a -> m a
concatStream = Stream (Of a) m () -> m a
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
Stream (Of a) m () -> m a
concatStreaming (Stream (Of a) m () -> m a)
-> (StreamResult m a -> Stream (Of a) m ())
-> StreamResult m a
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamResult m a -> Stream (Of a) m ()
forall (m :: * -> *) d. StreamResult m d -> Stream (Of d) m ()
unRes
concatStreamFold
:: Monoid b => FL.Fold a (StreamResult Identity b) -> FL.Fold a b
concatStreamFold :: Fold a (StreamResult Identity b) -> Fold a b
concatStreamFold = (StreamResult Identity b -> b)
-> Fold a (StreamResult Identity b) -> Fold a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity b -> b
forall a. Identity a -> a
S.runIdentity (Identity b -> b)
-> (StreamResult Identity b -> Identity b)
-> StreamResult Identity b
-> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamResult Identity b -> Identity b
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
StreamResult m a -> m a
concatStream)
concatStreamFoldM
:: (Monad m, Monoid b) => FL.FoldM m a (StreamResult m b) -> FL.FoldM m a b
concatStreamFoldM :: FoldM m a (StreamResult m b) -> FoldM m a b
concatStreamFoldM = (StreamResult m b -> m b)
-> FoldM m a (StreamResult m b) -> FoldM m a b
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM StreamResult m b -> m b
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
StreamResult m a -> m a
concatStream
streamingEngine
:: (Foldable g, Functor g)
=> ( forall z r
. Stream (Of (k, z)) Identity r
-> Stream (Of (k, g z)) Identity r
)
-> MRE.MapReduceFold y k c (StreamResult Identity) x d
streamingEngine :: (forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r)
-> MapReduceFold y k c (StreamResult Identity) x d
streamingEngine forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = (Stream (Of d) Identity () -> StreamResult Identity d)
-> Fold x (Stream (Of d) Identity ())
-> Fold x (StreamResult Identity d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Stream (Of d) Identity () -> StreamResult Identity d
forall (m :: * -> *) d. Stream (Of d) m () -> StreamResult m d
StreamResult (Fold x (Stream (Of d) Identity ())
-> Fold x (StreamResult Identity d))
-> Fold x (Stream (Of d) Identity ())
-> Fold x (StreamResult Identity d)
forall a b. (a -> b) -> a -> b
$ (Stream (Of x) Identity () -> x -> Stream (Of x) Identity ())
-> Stream (Of x) Identity ()
-> (Stream (Of x) Identity () -> Stream (Of d) Identity ())
-> Fold x (Stream (Of d) Identity ())
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
((x -> Stream (Of x) Identity () -> Stream (Of x) Identity ())
-> Stream (Of x) Identity () -> x -> Stream (Of x) Identity ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> Stream (Of x) Identity () -> Stream (Of x) Identity ()
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons)
(() -> Stream (Of x) Identity ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
( ((k, g c) -> d)
-> Stream (Of (k, g c)) Identity () -> Stream (Of d) Identity ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map (\(k
k, g c
lc) -> Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
(Stream (Of (k, g c)) Identity () -> Stream (Of d) Identity ())
-> (Stream (Of x) Identity () -> Stream (Of (k, g c)) Identity ())
-> Stream (Of x) Identity ()
-> Stream (Of d) Identity ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream (Of (k, c)) Identity () -> Stream (Of (k, g c)) Identity ()
forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r
groupByKey
(Stream (Of (k, c)) Identity ()
-> Stream (Of (k, g c)) Identity ())
-> (Stream (Of x) Identity () -> Stream (Of (k, c)) Identity ())
-> Stream (Of x) Identity ()
-> Stream (Of (k, g c)) Identity ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> (k, c))
-> Stream (Of y) Identity () -> Stream (Of (k, c)) Identity ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map y -> (k, c)
a
(Stream (Of y) Identity () -> Stream (Of (k, c)) Identity ())
-> (Stream (Of x) Identity () -> Stream (Of y) Identity ())
-> Stream (Of x) Identity ()
-> Stream (Of (k, c)) Identity ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y
-> Stream (Of x) Identity () -> Stream (Of y) Identity ()
forall x y r.
Unpack x y -> Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream Unpack x y
u
)
{-# INLINABLE streamingEngine #-}
streamingEngineM
:: (Monad m, Traversable g)
=> (forall z r . Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MRE.MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM :: (forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r =
(Stream (Of d) m () -> StreamResult m d)
-> FoldM m x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Stream (Of d) m () -> StreamResult m d
forall (m :: * -> *) d. Stream (Of d) m () -> StreamResult m d
StreamResult (FoldM m x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d))
-> (Fold x (Stream (Of d) m ()) -> FoldM m x (Stream (Of d) m ()))
-> Fold x (Stream (Of d) m ())
-> FoldM m x (StreamResult m d)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fold x (Stream (Of d) m ()) -> FoldM m x (Stream (Of d) m ())
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize (Fold x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d))
-> Fold x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d)
forall a b. (a -> b) -> a -> b
$ (Stream (Of x) m () -> x -> Stream (Of x) m ())
-> Stream (Of x) m ()
-> (Stream (Of x) m () -> Stream (Of d) m ())
-> Fold x (Stream (Of d) m ())
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
((x -> Stream (Of x) m () -> Stream (Of x) m ())
-> Stream (Of x) m () -> x -> Stream (Of x) m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> Stream (Of x) m () -> Stream (Of x) m ()
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons)
(() -> Stream (Of x) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
( ((k, g c) -> m d)
-> Stream (Of (k, g c)) m () -> Stream (Of d) m ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM (\(k
k, g c
lc) -> ReduceM m k c d -> k -> g c -> m d
forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r k
k g c
lc)
(Stream (Of (k, g c)) m () -> Stream (Of d) m ())
-> (Stream (Of x) m () -> Stream (Of (k, g c)) m ())
-> Stream (Of x) m ()
-> Stream (Of d) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream (Of (k, c)) m () -> Stream (Of (k, g c)) m ()
forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r
groupByKey
(Stream (Of (k, c)) m () -> Stream (Of (k, g c)) m ())
-> (Stream (Of x) m () -> Stream (Of (k, c)) m ())
-> Stream (Of x) m ()
-> Stream (Of (k, g c)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> m (k, c)) -> Stream (Of y) m () -> Stream (Of (k, c)) m ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM y -> m (k, c)
a
(Stream (Of y) m () -> Stream (Of (k, c)) m ())
-> (Stream (Of x) m () -> Stream (Of y) m ())
-> Stream (Of x) m ()
-> Stream (Of (k, c)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnpackM m x y -> Stream (Of x) m () -> Stream (Of y) m ()
forall (m :: * -> *) x y r.
Monad m =>
UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM UnpackM m x y
u
)
{-# INLINABLE streamingEngineM #-}