{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.Streaming
(
StreamResult(..)
, streamingEngine
, streamingEngineM
, resultToList
, concatStream
, concatStreamFold
, concatStreamFoldM
, groupByHashableKey
, groupByOrderedKey
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import qualified Control.Foldl as FL
import Data.Functor.Identity ( Identity )
import Data.Hashable ( Hashable )
import qualified Data.HashMap.Strict as HMS
import qualified Data.Map.Strict as MS
import qualified Data.Sequence as Seq
import qualified Streaming.Prelude as S
import qualified Streaming as S
import Streaming ( Stream
, Of
)
import Control.Arrow ( second )
unpackStream
:: MRC.Unpack x y -> S.Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream (MRC.Filter t) = S.filter t
unpackStream (MRC.Unpack f) = S.concat . S.map f
{-# INLINABLE unpackStream #-}
unpackStreamM
:: Monad m => MRC.UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM (MRC.FilterM t) = S.filterM t
unpackStreamM (MRC.UnpackM f) = S.concat . S.mapM f
{-# INLINABLE unpackStreamM #-}
groupByHashableKey
:: forall m k c r
. (Monad m, Hashable k, Eq k)
=> Stream (Of (k, c)) m r
-> Stream (Of (k, Seq.Seq c)) m r
groupByHashableKey s = S.effect $ do
(lkc S.:> r) <- S.toList s
let hm = HMS.fromListWith (<>) $ fmap (second Seq.singleton) lkc
return $ HMS.foldrWithKey (\k lc s' -> S.cons (k, lc) s') (return r) hm
{-# INLINABLE groupByHashableKey #-}
groupByOrderedKey
:: forall m k c r
. (Monad m, Ord k)
=> Stream (Of (k, c)) m r
-> Stream (Of (k, Seq.Seq c)) m r
groupByOrderedKey s = S.effect $ do
(lkc S.:> r) <- S.toList s
let hm = MS.fromListWith (<>) $ fmap (second Seq.singleton) lkc
return $ MS.foldrWithKey (\k lc s' -> S.cons (k, lc) s') (return r) hm
{-# INLINABLE groupByOrderedKey #-}
newtype StreamResult m d = StreamResult { unRes :: Stream (Of d) m () }
resultToList :: Monad m => StreamResult m d -> m [d]
resultToList = S.toList_ . unRes
concatStreaming :: (Monad m, Monoid a) => Stream (Of a) m () -> m a
concatStreaming = S.iterT g . fmap (const mempty)
where g (a S.:> ma) = fmap (a <>) ma
concatStream :: (Monad m, Monoid a) => StreamResult m a -> m a
concatStream = concatStreaming . unRes
concatStreamFold
:: Monoid b => FL.Fold a (StreamResult Identity b) -> FL.Fold a b
concatStreamFold = fmap (S.runIdentity . concatStream)
concatStreamFoldM
:: (Monad m, Monoid b) => FL.FoldM m a (StreamResult m b) -> FL.FoldM m a b
concatStreamFoldM = MRC.postMapM concatStream
streamingEngine
:: (Foldable g, Functor g)
=> ( forall z r
. Stream (Of (k, z)) Identity r
-> Stream (Of (k, g z)) Identity r
)
-> MRE.MapReduceFold y k c (StreamResult Identity) x d
streamingEngine groupByKey u (MRC.Assign a) r = fmap StreamResult $ FL.Fold
(flip S.cons)
(return ())
( S.map (\(k, lc) -> MRE.reduceFunction r k lc)
. groupByKey
. S.map a
. unpackStream u
)
{-# INLINABLE streamingEngine #-}
streamingEngineM
:: (Monad m, Traversable g)
=> (forall z r . Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MRE.MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM groupByKey u (MRC.AssignM a) r =
fmap StreamResult . FL.generalize $ FL.Fold
(flip S.cons)
(return ())
( S.mapM (\(k, lc) -> MRE.reduceFunctionM r k lc)
. groupByKey
. S.mapM a
. unpackStreamM u
)
{-# INLINABLE streamingEngineM #-}