{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Frames.Aggregation
(
RecordKeyMap
, combineKeyAggregations
, keyMap
, aggregateAllFold
, aggregateFold
, mergeDataFolds
)
where
import qualified Control.MapReduce as MR
import qualified Frames.MapReduce as FMR
import qualified Control.Foldl as FL
import qualified Frames as F
import qualified Frames.Melt as F
import qualified Frames.InCore as FI
import qualified Data.Vinyl as V
import qualified Data.Vinyl.TypeLevel as V
type RecordKeyMap k k' = F.Record k -> F.Record k'
combineKeyAggregations
:: (a F.⊆ (a V.++ b), b F.⊆ (a V.++ b), F.Disjoint a' b' ~ 'True)
=> RecordKeyMap a a'
-> RecordKeyMap b b'
-> RecordKeyMap (a V.++ b) (a' V.++ b')
combineKeyAggregations aToa' bTob' r =
aToa' (F.rcast r) `V.rappend` bTob' (F.rcast r)
keyMap
:: forall a b
. (V.KnownField a, V.KnownField b)
=> (V.Snd a -> V.Snd b)
-> RecordKeyMap '[a] '[b]
keyMap f r = f (F.rgetField @a r) F.&: V.RNil
aggregateAllFold
:: forall ak ak' d
. ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
, ak F.⊆ (ak V.++ d)
, ak' F.⊆ (ak' V.++ d)
, d F.⊆ (ak' V.++ d)
, Ord (F.Record ak')
, Ord (F.Record ak)
, FI.RecVec (ak' V.++ d)
)
=> RecordKeyMap ak ak'
-> (FL.Fold (F.Record d) (F.Record d))
-> FL.Fold (F.Record (ak V.++ d)) (F.FrameRec (ak' V.++ d))
aggregateAllFold toAggKey aggDataF =
let aggUnpack =
MR.Unpack
(\r -> [F.rcast @(ak' V.++ d) $ r `V.rappend` (toAggKey (F.rcast r))])
aggAssign = FMR.assignKeysAndData @ak' @d
in FMR.concatFold
$ FMR.mapReduceFold aggUnpack aggAssign (FMR.foldAndAddKey aggDataF)
aggregateFold
:: forall k ak ak' d
. ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
, ak F.⊆ (ak V.++ d)
, ak' F.⊆ (ak' V.++ d)
, d F.⊆ (ak' V.++ d)
, Ord (F.Record ak')
, FI.RecVec (ak' V.++ d)
, Ord (F.Record ak)
, (k V.++ (ak' V.++ d)) ~ ((k V.++ ak') V.++ d)
, Ord (F.Record k)
, k F.⊆ ((k V.++ ak') V.++ d)
, k F.⊆ ((k V.++ ak) V.++ d)
, (ak V.++ d) F.⊆ ((k V.++ ak) V.++ d)
, FI.RecVec ((k V.++ ak') V.++ d)
)
=> RecordKeyMap ak ak'
-> (FL.Fold (F.Record d) (F.Record d))
-> FL.Fold
(F.Record (k V.++ ak V.++ d))
(F.FrameRec (k V.++ ak' V.++ d))
aggregateFold keyAgg aggDataF = FMR.concatFold $ FMR.mapReduceFold
MR.noUnpack
(FMR.assignKeysAndData @k @(ak V.++ d))
( FMR.makeRecsWithKey id
$ MR.ReduceFold (const $ aggregateAllFold keyAgg aggDataF)
)
mergeDataFolds
:: FL.Fold (F.Record d) (F.Record '[a])
-> FL.Fold (F.Record d) (F.Record '[b])
-> FL.Fold (F.Record d) (F.Record '[a, b])
mergeDataFolds aF bF = V.rappend <$> aF <*> bF