{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE BangPatterns #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Simple
(
noUnpack
, simpleUnpack
, filterUnpack
, assign
, processAndLabel
, processAndLabelM
, foldAndLabel
, foldAndLabelM
, reduceMapWithKey
, reduceMMapWithKey
, mapReduceFold
, mapReduceFoldM
, hashableMapReduceFold
, hashableMapReduceFoldM
, unpackOnlyFold
, unpackOnlyFoldM
, concatFold
, concatFoldM
, module Control.MapReduce.Core
, Hashable
)
where
import qualified Control.MapReduce.Core as MR
import Control.MapReduce.Core
import qualified Control.MapReduce.Engines.Streaming
as MRST
import qualified Control.MapReduce.Engines.Streamly
as MRSL
import qualified Control.MapReduce.Engines.List
as MRL
import qualified Control.Foldl as FL
import qualified Data.Foldable as F
import Data.Functor.Identity ( Identity(Identity)
, runIdentity
)
import Data.Hashable ( Hashable )
noUnpack :: MR.Unpack x x
noUnpack = MR.Filter $ const True
{-# INLINABLE noUnpack #-}
simpleUnpack :: (x -> y) -> MR.Unpack x y
simpleUnpack f = MR.Unpack $ Identity . f
{-# INLINABLE simpleUnpack #-}
filterUnpack :: (x -> Bool) -> MR.Unpack x x
filterUnpack = MR.Filter
{-# INLINABLE filterUnpack #-}
assign :: forall k y c . (y -> k) -> (y -> c) -> MR.Assign k y c
assign getKey getCols = let f !y = (getKey y, getCols y) in MR.Assign f
{-# INLINABLE assign #-}
reduceMapWithKey :: (k -> y -> z) -> MR.Reduce k x y -> MR.Reduce k x z
reduceMapWithKey f r = case r of
MR.Reduce g -> MR.Reduce $ \k -> fmap (f k) (g k)
MR.ReduceFold gf -> MR.ReduceFold $ \k -> fmap (f k) (gf k)
{-# INLINABLE reduceMapWithKey #-}
reduceMMapWithKey :: (k -> y -> z) -> MR.ReduceM m k x y -> MR.ReduceM m k x z
reduceMMapWithKey f r = case r of
MR.ReduceM g -> MR.ReduceM $ \k -> fmap (fmap (f k)) (g k)
MR.ReduceFoldM gf -> MR.ReduceFoldM $ \k -> fmap (f k) (gf k)
{-# INLINABLE reduceMMapWithKey #-}
processAndLabel
:: (forall h . (Foldable h, Functor h) => h x -> y)
-> (k -> y -> z)
-> MR.Reduce k x z
processAndLabel process relabel = MR.Reduce $ \k -> relabel k . process
{-# INLINABLE processAndLabel #-}
processAndLabelM
:: Monad m
=> (forall h . (Foldable h, Functor h) => h x -> m y)
-> (k -> y -> z)
-> MR.ReduceM m k x z
processAndLabelM processM relabel =
MR.ReduceM $ \k -> fmap (relabel k) . processM
{-# INLINABLE processAndLabelM #-}
foldAndLabel :: FL.Fold x y -> (k -> y -> z) -> MR.Reduce k x z
foldAndLabel fld relabel = let q !k = fmap (relabel k) fld in MR.ReduceFold q
{-# INLINABLE foldAndLabel #-}
foldAndLabelM
:: Monad m => FL.FoldM m x y -> (k -> y -> z) -> MR.ReduceM m k x z
foldAndLabelM fld relabel =
let q !k = fmap (relabel k) fld in MR.ReduceFoldM q
{-# INLINABLE foldAndLabelM #-}
concatFold :: (Monoid d, Foldable g) => FL.Fold a (g d) -> FL.Fold a d
concatFold = fmap F.fold
concatFoldM
:: (Monad m, Monoid d, Foldable g) => FL.FoldM m a (g d) -> FL.FoldM m a d
concatFoldM = fmap F.fold
mapReduceFold
:: Ord k
=> MR.Unpack x y
-> MR.Assign k y c
-> MR.Reduce k c d
-> FL.Fold x [d]
mapReduceFold u a r =
fmap (runIdentity . MRSL.resultToList)
$ MRSL.streamlyEngine MRSL.groupByOrderedKey u a r
{-# INLINABLE mapReduceFold #-}
mapReduceFoldM
:: (Monad m, Ord k)
=> MR.UnpackM m x y
-> MR.AssignM m k y c
-> MR.ReduceM m k c d
-> FL.FoldM m x [d]
mapReduceFoldM u a r =
MR.postMapM id $ fmap MRST.resultToList $ MRST.streamingEngineM
MRST.groupByOrderedKey
u
a
r
{-# INLINABLE mapReduceFoldM #-}
hashableMapReduceFold
:: (Hashable k, Eq k)
=> MR.Unpack x y
-> MR.Assign k y c
-> MR.Reduce k c d
-> FL.Fold x [d]
hashableMapReduceFold u a r =
fmap (runIdentity . MRSL.resultToList)
$ MRSL.streamlyEngine MRSL.groupByHashableKey u a r
{-# INLINABLE hashableMapReduceFold #-}
hashableMapReduceFoldM
:: (Monad m, Hashable k, Eq k)
=> MR.UnpackM m x y
-> MR.AssignM m k y c
-> MR.ReduceM m k c d
-> FL.FoldM m x [d]
hashableMapReduceFoldM u a r =
MR.postMapM id $ fmap MRST.resultToList $ MRST.streamingEngineM
MRST.groupByHashableKey
u
a
r
{-# INLINABLE hashableMapReduceFoldM #-}
unpackOnlyFold :: MR.Unpack x y -> FL.Fold x [y]
unpackOnlyFold u = fmap (MRL.unpackList u) FL.list
{-# INLINABLE unpackOnlyFold #-}
unpackOnlyFoldM :: Monad m => MR.UnpackM m x y -> FL.FoldM m x [y]
unpackOnlyFoldM u = MR.postMapM (MRL.unpackListM u) (FL.generalize FL.list)
{-# INLINABLE unpackOnlyFoldM #-}