{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE BangPatterns #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Simple
(
noUnpack
, simpleUnpack
, filterUnpack
, assign
, processAndLabel
, processAndLabelM
, foldAndLabel
, foldAndLabelM
, reduceMapWithKey
, reduceMMapWithKey
, mapReduceFold
, mapReduceFoldM
, hashableMapReduceFold
, hashableMapReduceFoldM
, unpackOnlyFold
, unpackOnlyFoldM
, concatFold
, concatFoldM
, module Control.MapReduce.Core
, Hashable
)
where
import qualified Control.MapReduce.Core as MR
import Control.MapReduce.Core
import qualified Control.MapReduce.Engines.Streaming
as MRST
import qualified Control.MapReduce.Engines.Streamly
as MRSL
import qualified Control.MapReduce.Engines.List
as MRL
import qualified Control.Foldl as FL
import qualified Data.Foldable as F
import Data.Functor.Identity ( Identity(Identity)
, runIdentity
)
import Data.Hashable ( Hashable )
noUnpack :: MR.Unpack x x
noUnpack :: Unpack x x
noUnpack = (x -> Bool) -> Unpack x x
forall x. (x -> Bool) -> Unpack x x
MR.Filter ((x -> Bool) -> Unpack x x) -> (x -> Bool) -> Unpack x x
forall a b. (a -> b) -> a -> b
$ Bool -> x -> Bool
forall a b. a -> b -> a
const Bool
True
{-# INLINABLE noUnpack #-}
simpleUnpack :: (x -> y) -> MR.Unpack x y
simpleUnpack :: (x -> y) -> Unpack x y
simpleUnpack x -> y
f = (x -> Identity y) -> Unpack x y
forall (g :: * -> *) x y. Traversable g => (x -> g y) -> Unpack x y
MR.Unpack ((x -> Identity y) -> Unpack x y)
-> (x -> Identity y) -> Unpack x y
forall a b. (a -> b) -> a -> b
$ y -> Identity y
forall a. a -> Identity a
Identity (y -> Identity y) -> (x -> y) -> x -> Identity y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> y
f
{-# INLINABLE simpleUnpack #-}
filterUnpack :: (x -> Bool) -> MR.Unpack x x
filterUnpack :: (x -> Bool) -> Unpack x x
filterUnpack = (x -> Bool) -> Unpack x x
forall x. (x -> Bool) -> Unpack x x
MR.Filter
{-# INLINABLE filterUnpack #-}
assign :: forall k y c . (y -> k) -> (y -> c) -> MR.Assign k y c
assign :: (y -> k) -> (y -> c) -> Assign k y c
assign y -> k
getKey y -> c
getCols = let f :: y -> (k, c)
f !y
y = (y -> k
getKey y
y, y -> c
getCols y
y) in (y -> (k, c)) -> Assign k y c
forall y k c. (y -> (k, c)) -> Assign k y c
MR.Assign y -> (k, c)
f
{-# INLINABLE assign #-}
reduceMapWithKey :: (k -> y -> z) -> MR.Reduce k x y -> MR.Reduce k x z
reduceMapWithKey :: (k -> y -> z) -> Reduce k x y -> Reduce k x z
reduceMapWithKey k -> y -> z
f Reduce k x y
r = case Reduce k x y
r of
MR.Reduce k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
g -> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall k x d.
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> d)
-> Reduce k x d
MR.Reduce ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z)
-> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> (h x -> y) -> h x -> z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
g k
k)
MR.ReduceFold k -> Fold x y
gf -> (k -> Fold x z) -> Reduce k x z
forall k x d. (k -> Fold x d) -> Reduce k x d
MR.ReduceFold ((k -> Fold x z) -> Reduce k x z)
-> (k -> Fold x z) -> Reduce k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> Fold x y -> Fold x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> Fold x y
gf k
k)
{-# INLINABLE reduceMapWithKey #-}
reduceMMapWithKey :: (k -> y -> z) -> MR.ReduceM m k x y -> MR.ReduceM m k x z
reduceMMapWithKey :: (k -> y -> z) -> ReduceM m k x y -> ReduceM m k x z
reduceMMapWithKey k -> y -> z
f ReduceM m k x y
r = case ReduceM m k x y
r of
MR.ReduceM k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
g -> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m d)
-> ReduceM m k x d
MR.ReduceM ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z)
-> (k
-> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (m y -> m z) -> (h x -> m y) -> h x -> m z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((y -> z) -> m y -> m z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k)) (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
g k
k)
MR.ReduceFoldM k -> FoldM m x y
gf -> (k -> FoldM m x z) -> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> FoldM m x d) -> ReduceM m k x d
MR.ReduceFoldM ((k -> FoldM m x z) -> ReduceM m k x z)
-> (k -> FoldM m x z) -> ReduceM m k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> FoldM m x y -> FoldM m x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
f k
k) (k -> FoldM m x y
gf k
k)
{-# INLINABLE reduceMMapWithKey #-}
processAndLabel
:: (forall h . (Foldable h, Functor h) => h x -> y)
-> (k -> y -> z)
-> MR.Reduce k x z
processAndLabel :: (forall (h :: * -> *). (Foldable h, Functor h) => h x -> y)
-> (k -> y -> z) -> Reduce k x z
processAndLabel forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
process k -> y -> z
relabel = (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall k x d.
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> d)
-> Reduce k x d
MR.Reduce ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z)
-> (k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> z)
-> Reduce k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> k -> y -> z
relabel k
k (y -> z) -> (h x -> y) -> h x -> z
forall b c a. (b -> c) -> (a -> b) -> a -> c
. h x -> y
forall (h :: * -> *). (Foldable h, Functor h) => h x -> y
process
{-# INLINABLE processAndLabel #-}
processAndLabelM
:: Monad m
=> (forall h . (Foldable h, Functor h) => h x -> m y)
-> (k -> y -> z)
-> MR.ReduceM m k x z
processAndLabelM :: (forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y)
-> (k -> y -> z) -> ReduceM m k x z
processAndLabelM forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
processM k -> y -> z
relabel =
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m d)
-> ReduceM m k x d
MR.ReduceM ((k -> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z)
-> (k
-> forall (h :: * -> *). (Foldable h, Functor h) => h x -> m z)
-> ReduceM m k x z
forall a b. (a -> b) -> a -> b
$ \k
k -> (y -> z) -> m y -> m z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) (m y -> m z) -> (h x -> m y) -> h x -> m z
forall b c a. (b -> c) -> (a -> b) -> a -> c
. h x -> m y
forall (h :: * -> *). (Foldable h, Functor h) => h x -> m y
processM
{-# INLINABLE processAndLabelM #-}
foldAndLabel :: FL.Fold x y -> (k -> y -> z) -> MR.Reduce k x z
foldAndLabel :: Fold x y -> (k -> y -> z) -> Reduce k x z
foldAndLabel Fold x y
fld k -> y -> z
relabel = let q :: k -> Fold x z
q !k
k = (y -> z) -> Fold x y -> Fold x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) Fold x y
fld in (k -> Fold x z) -> Reduce k x z
forall k x d. (k -> Fold x d) -> Reduce k x d
MR.ReduceFold k -> Fold x z
q
{-# INLINABLE foldAndLabel #-}
foldAndLabelM
:: Monad m => FL.FoldM m x y -> (k -> y -> z) -> MR.ReduceM m k x z
foldAndLabelM :: FoldM m x y -> (k -> y -> z) -> ReduceM m k x z
foldAndLabelM FoldM m x y
fld k -> y -> z
relabel =
let q :: k -> FoldM m x z
q !k
k = (y -> z) -> FoldM m x y -> FoldM m x z
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k -> y -> z
relabel k
k) FoldM m x y
fld in (k -> FoldM m x z) -> ReduceM m k x z
forall (m :: * -> *) k x d.
Monad m =>
(k -> FoldM m x d) -> ReduceM m k x d
MR.ReduceFoldM k -> FoldM m x z
q
{-# INLINABLE foldAndLabelM #-}
concatFold :: (Monoid d, Foldable g) => FL.Fold a (g d) -> FL.Fold a d
concatFold :: Fold a (g d) -> Fold a d
concatFold = (g d -> d) -> Fold a (g d) -> Fold a d
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g d -> d
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold
concatFoldM
:: (Monad m, Monoid d, Foldable g) => FL.FoldM m a (g d) -> FL.FoldM m a d
concatFoldM :: FoldM m a (g d) -> FoldM m a d
concatFoldM = (g d -> d) -> FoldM m a (g d) -> FoldM m a d
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g d -> d
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold
mapReduceFold
:: Ord k
=> MR.Unpack x y
-> MR.Assign k y c
-> MR.Reduce k c d
-> FL.Fold x [d]
mapReduceFold :: Unpack x y -> Assign k y c -> Reduce k c d -> Fold x [d]
mapReduceFold Unpack x y
u Assign k y c
a Reduce k c d
r =
(SerialT Identity d -> [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity [d] -> [d]
forall a. Identity a -> a
runIdentity (Identity [d] -> [d])
-> (SerialT Identity d -> Identity [d])
-> SerialT Identity d
-> [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity d -> Identity [d]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
MRSL.resultToList)
(Fold x (SerialT Identity d) -> Fold x [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall a b. (a -> b) -> a -> b
$ (forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z))
-> MapReduceFold y k c (SerialT Identity) x d
forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
MRSL.streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z)
forall (m :: * -> *) k c.
(Monad m, Ord k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
MRSL.groupByOrderedKey Unpack x y
u Assign k y c
a Reduce k c d
r
{-# INLINABLE mapReduceFold #-}
mapReduceFoldM
:: (Monad m, Ord k)
=> MR.UnpackM m x y
-> MR.AssignM m k y c
-> MR.ReduceM m k c d
-> FL.FoldM m x [d]
mapReduceFoldM :: UnpackM m x y
-> AssignM m k y c -> ReduceM m k c d -> FoldM m x [d]
mapReduceFoldM UnpackM m x y
u AssignM m k y c
a ReduceM m k c d
r =
(m [d] -> m [d]) -> FoldM m x (m [d]) -> FoldM m x [d]
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM m [d] -> m [d]
forall a. a -> a
id (FoldM m x (m [d]) -> FoldM m x [d])
-> FoldM m x (m [d]) -> FoldM m x [d]
forall a b. (a -> b) -> a -> b
$ (StreamResult m d -> m [d])
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamResult m d -> m [d]
forall (m :: * -> *) d. Monad m => StreamResult m d -> m [d]
MRST.resultToList (FoldM m x (StreamResult m d) -> FoldM m x (m [d]))
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall a b. (a -> b) -> a -> b
$ (forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
forall (m :: * -> *) (g :: * -> *) k y c x d.
(Monad m, Traversable g) =>
(forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
MRST.streamingEngineM
forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r
forall (m :: * -> *) k c r.
(Monad m, Ord k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
MRST.groupByOrderedKey
UnpackM m x y
u
AssignM m k y c
a
ReduceM m k c d
r
{-# INLINABLE mapReduceFoldM #-}
hashableMapReduceFold
:: (Hashable k, Eq k)
=> MR.Unpack x y
-> MR.Assign k y c
-> MR.Reduce k c d
-> FL.Fold x [d]
hashableMapReduceFold :: Unpack x y -> Assign k y c -> Reduce k c d -> Fold x [d]
hashableMapReduceFold Unpack x y
u Assign k y c
a Reduce k c d
r =
(SerialT Identity d -> [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity [d] -> [d]
forall a. Identity a -> a
runIdentity (Identity [d] -> [d])
-> (SerialT Identity d -> Identity [d])
-> SerialT Identity d
-> [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT Identity d -> Identity [d]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
MRSL.resultToList)
(Fold x (SerialT Identity d) -> Fold x [d])
-> Fold x (SerialT Identity d) -> Fold x [d]
forall a b. (a -> b) -> a -> b
$ (forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z))
-> MapReduceFold y k c (SerialT Identity) x d
forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
MRSL.streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, Seq z)
forall (m :: * -> *) k c.
(Monad m, Hashable k, Eq k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
MRSL.groupByHashableKey Unpack x y
u Assign k y c
a Reduce k c d
r
{-# INLINABLE hashableMapReduceFold #-}
hashableMapReduceFoldM
:: (Monad m, Hashable k, Eq k)
=> MR.UnpackM m x y
-> MR.AssignM m k y c
-> MR.ReduceM m k c d
-> FL.FoldM m x [d]
hashableMapReduceFoldM :: UnpackM m x y
-> AssignM m k y c -> ReduceM m k c d -> FoldM m x [d]
hashableMapReduceFoldM UnpackM m x y
u AssignM m k y c
a ReduceM m k c d
r =
(m [d] -> m [d]) -> FoldM m x (m [d]) -> FoldM m x [d]
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM m [d] -> m [d]
forall a. a -> a
id (FoldM m x (m [d]) -> FoldM m x [d])
-> FoldM m x (m [d]) -> FoldM m x [d]
forall a b. (a -> b) -> a -> b
$ (StreamResult m d -> m [d])
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamResult m d -> m [d]
forall (m :: * -> *) d. Monad m => StreamResult m d -> m [d]
MRST.resultToList (FoldM m x (StreamResult m d) -> FoldM m x (m [d]))
-> FoldM m x (StreamResult m d) -> FoldM m x (m [d])
forall a b. (a -> b) -> a -> b
$ (forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
forall (m :: * -> *) (g :: * -> *) k y c x d.
(Monad m, Traversable g) =>
(forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
MRST.streamingEngineM
forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, Seq z)) m r
forall (m :: * -> *) k c r.
(Monad m, Hashable k, Eq k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
MRST.groupByHashableKey
UnpackM m x y
u
AssignM m k y c
a
ReduceM m k c d
r
{-# INLINABLE hashableMapReduceFoldM #-}
unpackOnlyFold :: MR.Unpack x y -> FL.Fold x [y]
unpackOnlyFold :: Unpack x y -> Fold x [y]
unpackOnlyFold Unpack x y
u = ([x] -> [y]) -> Fold x [x] -> Fold x [y]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Unpack x y -> [x] -> [y]
forall x y. Unpack x y -> [x] -> [y]
MRL.unpackList Unpack x y
u) Fold x [x]
forall a. Fold a [a]
FL.list
{-# INLINABLE unpackOnlyFold #-}
unpackOnlyFoldM :: Monad m => MR.UnpackM m x y -> FL.FoldM m x [y]
unpackOnlyFoldM :: UnpackM m x y -> FoldM m x [y]
unpackOnlyFoldM UnpackM m x y
u = ([x] -> m [y]) -> FoldM m x [x] -> FoldM m x [y]
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MR.postMapM (UnpackM m x y -> [x] -> m [y]
forall (m :: * -> *) x y. UnpackM m x y -> [x] -> m [y]
MRL.unpackListM UnpackM m x y
u) (Fold x [x] -> FoldM m x [x]
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize Fold x [x]
forall a. Fold a [a]
FL.list)
{-# INLINABLE unpackOnlyFoldM #-}