{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE BangPatterns          #-}
{-# LANGUAGE TypeApplications      #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Engines.Vector
Description : map-reduce-folds builders
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

map-reduce engine (fold builder) using @Vector@ as its intermediate type.
-}
module Control.MapReduce.Engines.Vector
  (
    -- * Engines
    vectorEngine
  , vectorEngineM

  -- * groupBy functions
  , groupByHashableKey
  , groupByOrderedKey

  -- * re-exports
  , toList
  )
where

import qualified Control.MapReduce.Core        as MRC
import qualified Control.MapReduce.Engines     as MRE

import qualified Control.Foldl                 as FL
import           Control.Monad                  ( (<=<) )
import qualified Data.Foldable                 as F
import           Data.Hashable                  ( Hashable )
import qualified Data.HashMap.Strict           as HMS
import qualified Data.Map.Strict               as MS
import qualified Data.Sequence                 as Seq
import qualified Data.Vector                   as V
import           Data.Vector                    ( Vector
                                                , toList
                                                )
import           Control.Arrow                  ( second )

-- | case analysis of @Unpack@ for @Vector@ based mapReduce
unpackVector :: MRC.Unpack x y -> Vector x -> Vector y
unpackVector :: Unpack x y -> Vector x -> Vector y
unpackVector (MRC.Filter x -> Bool
t) = (x -> Bool) -> Vector x -> Vector x
forall a. (a -> Bool) -> Vector a -> Vector a
V.filter x -> Bool
t
unpackVector (MRC.Unpack x -> g y
f) = (x -> Vector y) -> Vector x -> Vector y
forall a b. (a -> Vector b) -> Vector a -> Vector b
V.concatMap ([y] -> Vector y
forall a. [a] -> Vector a
V.fromList ([y] -> Vector y) -> (x -> [y]) -> x -> Vector y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. g y -> [y]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (g y -> [y]) -> (x -> g y) -> x -> [y]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackVector #-}

-- | case analysis of @Unpack@ for @Vector@ based mapReduce
unpackVectorM :: Monad m => MRC.UnpackM m x y -> Vector x -> m (Vector y)
unpackVectorM :: UnpackM m x y -> Vector x -> m (Vector y)
unpackVectorM (MRC.FilterM x -> m Bool
t) = (x -> m Bool) -> Vector x -> m (Vector x)
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Vector a -> m (Vector a)
V.filterM x -> m Bool
t
unpackVectorM (MRC.UnpackM x -> m (g y)
f) =
  (Vector (Vector y) -> Vector y)
-> m (Vector (Vector y)) -> m (Vector y)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Vector y -> Vector y) -> Vector (Vector y) -> Vector y
forall a b. (a -> Vector b) -> Vector a -> Vector b
V.concatMap Vector y -> Vector y
forall a. a -> a
id) (m (Vector (Vector y)) -> m (Vector y))
-> (Vector x -> m (Vector (Vector y))) -> Vector x -> m (Vector y)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> m (Vector y)) -> Vector x -> m (Vector (Vector y))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((g y -> Vector y) -> m (g y) -> m (Vector y)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([y] -> Vector y
forall a. [a] -> Vector a
V.fromList ([y] -> Vector y) -> (g y -> [y]) -> g y -> Vector y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. g y -> [y]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList) (m (g y) -> m (Vector y)) -> (x -> m (g y)) -> x -> m (Vector y)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (g y)
f)
{-# INLINABLE unpackVectorM #-}


-- | group the mapped and assigned values by key using a @Data.HashMap.Strict@
groupByHashableKey
  :: forall k c . (Hashable k, Eq k) => Vector (k, c) -> Vector (k, Seq.Seq c)
groupByHashableKey :: Vector (k, c) -> Vector (k, Seq c)
groupByHashableKey Vector (k, c)
v =
  let hm :: HashMap k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> HashMap k (Seq c)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> HashMap k (Seq c))
-> [(k, Seq c)] -> HashMap k (Seq c)
forall a b. (a -> b) -> a -> b
$ Vector (k, Seq c) -> [(k, Seq c)]
forall a. Vector a -> [a]
V.toList (Vector (k, Seq c) -> [(k, Seq c)])
-> Vector (k, Seq c) -> [(k, Seq c)]
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> Vector (k, c) -> Vector (k, Seq c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) Vector (k, c)
v
  in  [(k, Seq c)] -> Vector (k, Seq c)
forall a. [a] -> Vector a
V.fromList ([(k, Seq c)] -> Vector (k, Seq c))
-> [(k, Seq c)] -> Vector (k, Seq c)
forall a b. (a -> b) -> a -> b
$ HashMap k (Seq c) -> [(k, Seq c)]
forall k v. HashMap k v -> [(k, v)]
HMS.toList HashMap k (Seq c)
hm -- HML.foldrWithKey (\k lc v -> V.snoc v (k,lc)) V.empty hm 
{-# INLINABLE groupByHashableKey #-}

-- | group the mapped and assigned values by key using a @Data.Map.Strict@
groupByOrderedKey
  :: forall k c . Ord k => Vector (k, c) -> Vector (k, Seq.Seq c)
groupByOrderedKey :: Vector (k, c) -> Vector (k, Seq c)
groupByOrderedKey Vector (k, c)
v =
  let hm :: Map k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> Map k (Seq c)
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> Map k (Seq c)) -> [(k, Seq c)] -> Map k (Seq c)
forall a b. (a -> b) -> a -> b
$ Vector (k, Seq c) -> [(k, Seq c)]
forall a. Vector a -> [a]
V.toList (Vector (k, Seq c) -> [(k, Seq c)])
-> Vector (k, Seq c) -> [(k, Seq c)]
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> Vector (k, c) -> Vector (k, Seq c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) Vector (k, c)
v
  in  [(k, Seq c)] -> Vector (k, Seq c)
forall a. [a] -> Vector a
V.fromList ([(k, Seq c)] -> Vector (k, Seq c))
-> [(k, Seq c)] -> Vector (k, Seq c)
forall a b. (a -> b) -> a -> b
$ Map k (Seq c) -> [(k, Seq c)]
forall k a. Map k a -> [(k, a)]
MS.toList Map k (Seq c)
hm --MS.foldrWithKey (\k lc s -> VS.cons (k,lc) s) VS.empty hm
{-# INLINABLE groupByOrderedKey #-}

-- | map-reduce-fold builder, using @Vector@, returning a @Vector@ result
vectorEngine
  :: (Foldable g, Functor g)
  => (Vector (k, c) -> Vector (k, g c))
  -> MRE.MapReduceFold y k c Vector x d
vectorEngine :: (Vector (k, c) -> Vector (k, g c))
-> MapReduceFold y k c Vector x d
vectorEngine Vector (k, c) -> Vector (k, g c)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = (Vector x -> Vector d) -> Fold x (Vector x) -> Fold x (Vector d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
  ( ((k, g c) -> d) -> Vector (k, g c) -> Vector d
forall a b. (a -> b) -> Vector a -> Vector b
V.map ((k -> g c -> d) -> (k, g c) -> d
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r))
  (Vector (k, g c) -> Vector d)
-> (Vector x -> Vector (k, g c)) -> Vector x -> Vector d
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector (k, c) -> Vector (k, g c)
groupByKey
  (Vector (k, c) -> Vector (k, g c))
-> (Vector x -> Vector (k, c)) -> Vector x -> Vector (k, g c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> (k, c)) -> Vector y -> Vector (k, c)
forall a b. (a -> b) -> Vector a -> Vector b
V.map y -> (k, c)
a
  (Vector y -> Vector (k, c))
-> (Vector x -> Vector y) -> Vector x -> Vector (k, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y -> Vector x -> Vector y
forall x y. Unpack x y -> Vector x -> Vector y
unpackVector Unpack x y
u
  )
  Fold x (Vector x)
forall (v :: * -> *) a. Vector v a => Fold a (v a)
FL.vector
{-# INLINABLE vectorEngine #-}

-- | effectful map-reduce-fold builder, using @Vector@, returning an effectful @Vector@ result
vectorEngineM
  :: (Monad m, Traversable g)
  => (Vector (k, c) -> Vector (k, g c))
  -> MRE.MapReduceFoldM m y k c Vector x d
vectorEngineM :: (Vector (k, c) -> Vector (k, g c))
-> MapReduceFoldM m y k c Vector x d
vectorEngineM Vector (k, c) -> Vector (k, g c)
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r = (Vector x -> m (Vector d))
-> FoldM m x (Vector x) -> FoldM m x (Vector d)
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM
  ( (((k, g c) -> m d) -> Vector (k, g c) -> m (Vector d)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((k -> g c -> m d) -> (k, g c) -> m d
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (ReduceM m k c d -> k -> g c -> m d
forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r)) (Vector (k, g c) -> m (Vector d))
-> m (Vector (k, g c)) -> m (Vector d)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<)
  (m (Vector (k, g c)) -> m (Vector d))
-> (Vector x -> m (Vector (k, g c))) -> Vector x -> m (Vector d)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Vector (k, c) -> Vector (k, g c))
-> m (Vector (k, c)) -> m (Vector (k, g c))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Vector (k, c) -> Vector (k, g c)
groupByKey
  (m (Vector (k, c)) -> m (Vector (k, g c)))
-> (Vector x -> m (Vector (k, c)))
-> Vector x
-> m (Vector (k, g c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((y -> m (k, c)) -> Vector y -> m (Vector (k, c))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM y -> m (k, c)
a (Vector y -> m (Vector (k, c)))
-> (Vector x -> m (Vector y)) -> Vector x -> m (Vector (k, c))
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< UnpackM m x y -> Vector x -> m (Vector y)
forall (m :: * -> *) x y.
Monad m =>
UnpackM m x y -> Vector x -> m (Vector y)
unpackVectorM UnpackM m x y
u)
  )
  (Fold x (Vector x) -> FoldM m x (Vector x)
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize Fold x (Vector x)
forall (v :: * -> *) a. Vector v a => Fold a (v a)
FL.vector)
{-# INLINABLE vectorEngineM #-}
-- NB: If we are willing to constrain to PrimMonad m, then we can use vectorM here which can do in-place updates, etc.