{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE InstanceSigs #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Frames.MapReduce
(
unpackFilterRow
, unpackFilterOnField
, unpackGoodRows
, assignKeysAndData
, assignKeys
, splitOnKeys
, splitOnData
, reduceAndAddKey
, foldAndAddKey
, makeRecsWithKey
, makeRecsWithKeyM
, module Control.MapReduce
)
where
import qualified Control.MapReduce as MR
import Control.MapReduce
import qualified Control.Foldl as FL
import qualified Data.Hashable as Hash
import qualified Frames as F
import qualified Frames.Melt as F
import qualified Frames.InCore as FI
import qualified Data.Vinyl as V
import qualified Data.Vinyl.TypeLevel as V
instance Hash.Hashable (F.Record '[]) where
hash :: Record '[] -> Int
hash = Int -> Record '[] -> Int
forall a b. a -> b -> a
const Int
0
{-# INLINABLE hash #-}
hashWithSalt :: Int -> Record '[] -> Int
hashWithSalt Int
s = Int -> Record '[] -> Int
forall a b. a -> b -> a
const Int
s
{-# INLINABLE hashWithSalt #-}
instance (V.KnownField t, Hash.Hashable (V.Snd t), Hash.Hashable (F.Record rs), rs F.⊆ (t ': rs)) => Hash.Hashable (F.Record (t ': rs)) where
hashWithSalt :: Int -> Record (t : rs) -> Int
hashWithSalt Int
s Record (t : rs)
r = Int
s Int -> Snd t -> Int
forall a. Hashable a => Int -> a -> Int
`Hash.hashWithSalt` (Record (t : rs) -> Snd t
forall (t :: (Symbol, *)) (s :: Symbol) a (rs :: [(Symbol, *)]).
(t ~ '(s, a), t ∈ rs) =>
Record rs -> a
F.rgetField @t Record (t : rs)
r) Int -> Record rs -> Int
forall a. Hashable a => Int -> a -> Int
`Hash.hashWithSalt` (Record (t : rs) -> Record rs
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
(record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @rs Record (t : rs)
r)
{-# INLINABLE hashWithSalt #-}
unpackFilterRow
:: (F.Record rs -> Bool) -> MR.Unpack (F.Record rs) (F.Record rs)
unpackFilterRow :: (Record rs -> Bool) -> Unpack (Record rs) (Record rs)
unpackFilterRow Record rs -> Bool
test = (Record rs -> Bool) -> Unpack (Record rs) (Record rs)
forall x. (x -> Bool) -> Unpack x x
MR.Filter Record rs -> Bool
test
unpackFilterOnField
:: forall t rs
. (V.KnownField t, F.ElemOf rs t)
=> (V.Snd t -> Bool)
-> MR.Unpack (F.Record rs) (F.Record rs)
unpackFilterOnField :: (Snd t -> Bool) -> Unpack (Record rs) (Record rs)
unpackFilterOnField Snd t -> Bool
test = (Record rs -> Bool) -> Unpack (Record rs) (Record rs)
forall (rs :: [(Symbol, *)]).
(Record rs -> Bool) -> Unpack (Record rs) (Record rs)
unpackFilterRow (Snd t -> Bool
test (Snd t -> Bool) -> (Record rs -> Snd t) -> Record rs -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (s :: Symbol) a (rs :: [(Symbol, *)]).
(t ~ '(s, a), t ∈ rs) =>
Record rs -> a
forall (t :: (Symbol, *)) (s :: Symbol) a (rs :: [(Symbol, *)]).
(t ~ '(s, a), t ∈ rs) =>
Record rs -> a
F.rgetField @t)
unpackGoodRows
:: forall cs rs
. (cs F.⊆ rs)
=> MR.Unpack (F.Rec (Maybe F.:. F.ElField) rs) (F.Record cs)
unpackGoodRows :: Unpack (Rec (Maybe :. ElField) rs) (Record cs)
unpackGoodRows = (Rec (Maybe :. ElField) rs -> Maybe (Record cs))
-> Unpack (Rec (Maybe :. ElField) rs) (Record cs)
forall (g :: * -> *) x y. Traversable g => (x -> g y) -> Unpack x y
MR.Unpack ((Rec (Maybe :. ElField) rs -> Maybe (Record cs))
-> Unpack (Rec (Maybe :. ElField) rs) (Record cs))
-> (Rec (Maybe :. ElField) rs -> Maybe (Record cs))
-> Unpack (Rec (Maybe :. ElField) rs) (Record cs)
forall a b. (a -> b) -> a -> b
$ Rec (Maybe :. ElField) cs -> Maybe (Record cs)
forall (cs :: [(Symbol, *)]).
Rec (Maybe :. ElField) cs -> Maybe (Record cs)
F.recMaybe (Rec (Maybe :. ElField) cs -> Maybe (Record cs))
-> (Rec (Maybe :. ElField) rs -> Rec (Maybe :. ElField) cs)
-> Rec (Maybe :. ElField) rs
-> Maybe (Record cs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Rec (Maybe :. ElField) rs -> Rec (Maybe :. ElField) cs
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
(record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast
assignKeysAndData
:: forall ks cs rs
. (ks F.⊆ rs, cs F.⊆ rs)
=> MR.Assign (F.Record ks) (F.Record rs) (F.Record cs)
assignKeysAndData :: Assign (Record ks) (Record rs) (Record cs)
assignKeysAndData = (Record rs -> Record ks)
-> (Record rs -> Record cs)
-> Assign (Record ks) (Record rs) (Record cs)
forall k y c. (y -> k) -> (y -> c) -> Assign k y c
MR.assign (forall (ss :: [(Symbol, *)]) (f :: (Symbol, *) -> *)
(record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (is :: [Nat]).
(RecSubset record ks ss is, RecSubsetFCtx record f) =>
record f ss -> record f ks
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
(record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @ks) (forall (ss :: [(Symbol, *)]) (f :: (Symbol, *) -> *)
(record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (is :: [Nat]).
(RecSubset record cs ss is, RecSubsetFCtx record f) =>
record f ss -> record f cs
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
(record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @cs)
{-# INLINABLE assignKeysAndData #-}
assignKeys
:: forall ks rs
. (ks F.⊆ rs)
=> MR.Assign (F.Record ks) (F.Record rs) (F.Record rs)
assignKeys :: Assign (Record ks) (Record rs) (Record rs)
assignKeys = (Record rs -> Record ks)
-> (Record rs -> Record rs)
-> Assign (Record ks) (Record rs) (Record rs)
forall k y c. (y -> k) -> (y -> c) -> Assign k y c
MR.assign (forall (ss :: [(Symbol, *)]) (f :: (Symbol, *) -> *)
(record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (is :: [Nat]).
(RecSubset record ks ss is, RecSubsetFCtx record f) =>
record f ss -> record f ks
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
(record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @ks) Record rs -> Record rs
forall a. a -> a
id
{-# INLINABLE assignKeys #-}
splitOnKeys
:: forall ks rs cs
. (ks F.⊆ rs, cs ~ F.RDeleteAll ks rs, cs F.⊆ rs)
=> MR.Assign (F.Record ks) (F.Record rs) (F.Record cs)
splitOnKeys :: Assign (Record ks) (Record rs) (Record cs)
splitOnKeys = forall (rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
forall (ks :: [(Symbol, *)]) (cs :: [(Symbol, *)])
(rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
assignKeysAndData @ks @cs
{-# INLINABLE splitOnKeys #-}
splitOnData
:: forall cs rs ks
. (cs F.⊆ rs, ks ~ F.RDeleteAll cs rs, ks F.⊆ rs)
=> MR.Assign (F.Record ks) (F.Record rs) (F.Record cs)
splitOnData :: Assign (Record ks) (Record rs) (Record cs)
splitOnData = forall (rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
forall (ks :: [(Symbol, *)]) (cs :: [(Symbol, *)])
(rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
assignKeysAndData @ks @cs
{-# INLINABLE splitOnData #-}
reduceAndAddKey
:: forall ks cs x
. FI.RecVec ((ks V.++ cs))
=> (forall h . Foldable h => h x -> F.Record cs)
-> MR.Reduce (F.Record ks) x (F.FrameRec (ks V.++ cs))
reduceAndAddKey :: (forall (h :: * -> *). Foldable h => h x -> Record cs)
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
reduceAndAddKey forall (h :: * -> *). Foldable h => h x -> Record cs
process =
(Record (ks ++ cs) -> FrameRec (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([Record (ks ++ cs)] -> FrameRec (ks ++ cs)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame ([Record (ks ++ cs)] -> FrameRec (ks ++ cs))
-> (Record (ks ++ cs) -> [Record (ks ++ cs)])
-> Record (ks ++ cs)
-> FrameRec (ks ++ cs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Applicative [] => a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure @[]) (Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs)))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall a b. (a -> b) -> a -> b
$ (forall (h :: * -> *). (Foldable h, Functor h) => h x -> Record cs)
-> (Record ks -> Record cs -> Record (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
forall x y k z.
(forall (h :: * -> *). (Foldable h, Functor h) => h x -> y)
-> (k -> y -> z) -> Reduce k x z
MR.processAndLabel forall (h :: * -> *). Foldable h => h x -> Record cs
forall (h :: * -> *). (Foldable h, Functor h) => h x -> Record cs
process Record ks -> Record cs -> Record (ks ++ cs)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend
{-# INLINABLE reduceAndAddKey #-}
foldAndAddKey
:: (FI.RecVec ((ks V.++ cs)))
=> FL.Fold x (F.Record cs)
-> MR.Reduce (F.Record ks) x (F.FrameRec (ks V.++ cs))
foldAndAddKey :: Fold x (Record cs) -> Reduce (Record ks) x (FrameRec (ks ++ cs))
foldAndAddKey Fold x (Record cs)
fld = (Record (ks ++ cs) -> FrameRec (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([Record (ks ++ cs)] -> FrameRec (ks ++ cs)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame ([Record (ks ++ cs)] -> FrameRec (ks ++ cs))
-> (Record (ks ++ cs) -> [Record (ks ++ cs)])
-> Record (ks ++ cs)
-> FrameRec (ks ++ cs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Applicative [] => a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure @[]) (Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs)))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall a b. (a -> b) -> a -> b
$ Fold x (Record cs)
-> (Record ks -> Record cs -> Record (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
forall x y k z. Fold x y -> (k -> y -> z) -> Reduce k x z
MR.foldAndLabel Fold x (Record cs)
fld Record ks -> Record cs -> Record (ks ++ cs)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend
{-# INLINABLE foldAndAddKey #-}
makeRecsWithKey
:: (Functor g, Foldable g, (FI.RecVec (ks V.++ as)))
=> (y -> F.Record as)
-> MR.Reduce (F.Record ks) x (g y)
-> MR.Reduce (F.Record ks) x (F.FrameRec (ks V.++ as))
makeRecsWithKey :: (y -> Record as)
-> Reduce (Record ks) x (g y)
-> Reduce (Record ks) x (FrameRec (ks ++ as))
makeRecsWithKey y -> Record as
makeRec Reduce (Record ks) x (g y)
reduceToY = (g (Record (ks ++ as)) -> FrameRec (ks ++ as))
-> Reduce (Record ks) x (g (Record (ks ++ as)))
-> Reduce (Record ks) x (FrameRec (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g (Record (ks ++ as)) -> FrameRec (ks ++ as)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame
(Reduce (Record ks) x (g (Record (ks ++ as)))
-> Reduce (Record ks) x (FrameRec (ks ++ as)))
-> Reduce (Record ks) x (g (Record (ks ++ as)))
-> Reduce (Record ks) x (FrameRec (ks ++ as))
forall a b. (a -> b) -> a -> b
$ (Record ks -> g y -> g (Record (ks ++ as)))
-> Reduce (Record ks) x (g y)
-> Reduce (Record ks) x (g (Record (ks ++ as)))
forall k y z x. (k -> y -> z) -> Reduce k x y -> Reduce k x z
MR.reduceMapWithKey Record ks -> g y -> g (Record (ks ++ as))
addKey Reduce (Record ks) x (g y)
reduceToY
where addKey :: Record ks -> g y -> g (Record (ks ++ as))
addKey Record ks
k = (y -> Record (ks ++ as)) -> g y -> g (Record (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Record ks -> Record as -> Record (ks ++ as)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend Record ks
k (Record as -> Record (ks ++ as))
-> (y -> Record as) -> y -> Record (ks ++ as)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> Record as
makeRec)
{-# INLINABLE makeRecsWithKey #-}
makeRecsWithKeyM
:: (Monad m, Functor g, Foldable g, (FI.RecVec (ks V.++ as)))
=> (y -> F.Record as)
-> MR.ReduceM m (F.Record ks) x (g y)
-> MR.ReduceM m (F.Record ks) x (F.FrameRec (ks V.++ as))
makeRecsWithKeyM :: (y -> Record as)
-> ReduceM m (Record ks) x (g y)
-> ReduceM m (Record ks) x (FrameRec (ks ++ as))
makeRecsWithKeyM y -> Record as
makeRec ReduceM m (Record ks) x (g y)
reduceToY = (g (Record (ks ++ as)) -> FrameRec (ks ++ as))
-> ReduceM m (Record ks) x (g (Record (ks ++ as)))
-> ReduceM m (Record ks) x (FrameRec (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g (Record (ks ++ as)) -> FrameRec (ks ++ as)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame
(ReduceM m (Record ks) x (g (Record (ks ++ as)))
-> ReduceM m (Record ks) x (FrameRec (ks ++ as)))
-> ReduceM m (Record ks) x (g (Record (ks ++ as)))
-> ReduceM m (Record ks) x (FrameRec (ks ++ as))
forall a b. (a -> b) -> a -> b
$ (Record ks -> g y -> g (Record (ks ++ as)))
-> ReduceM m (Record ks) x (g y)
-> ReduceM m (Record ks) x (g (Record (ks ++ as)))
forall k y z (m :: * -> *) x.
(k -> y -> z) -> ReduceM m k x y -> ReduceM m k x z
MR.reduceMMapWithKey Record ks -> g y -> g (Record (ks ++ as))
addKey ReduceM m (Record ks) x (g y)
reduceToY
where addKey :: Record ks -> g y -> g (Record (ks ++ as))
addKey Record ks
k = (y -> Record (ks ++ as)) -> g y -> g (Record (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Record ks -> Record as -> Record (ks ++ as)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend Record ks
k (Record as -> Record (ks ++ as))
-> (y -> Record as) -> y -> Record (ks ++ as)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> Record as
makeRec)
{-# INLINABLE makeRecsWithKeyM #-}