{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BangPatterns #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.Streamly
(
streamlyEngine
, streamlyEngineM
, concurrentStreamlyEngine
, toStreamlyFold
, toStreamlyFoldM
, resultToList
, concatStream
, concatStreamFold
, concatStreamFoldM
, concatConcurrentStreamFold
, groupByHashableKey
, groupByOrderedKey
, groupByHashableKeyST
, groupByDiscriminatedKey
, SerialT
, WSerialT
, AheadT
, AsyncT
, WAsyncT
, ParallelT
, MonadAsync
, IsStream
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import Control.Arrow ( second )
import qualified Control.Foldl as FL
import Control.Monad.ST as ST
import qualified Data.Discrimination.Grouping as DG
import qualified Data.Foldable as F
import Data.Functor.Identity ( Identity(runIdentity) )
import Data.Hashable ( Hashable )
import qualified Data.HashMap.Strict as HMS
import qualified Data.HashTable.Class as HT
import qualified Data.HashTable.ST.Cuckoo as HTC
import qualified Data.Map.Strict as MS
import qualified Data.Sequence as Seq
import qualified Streamly.Prelude as S
import qualified Streamly as S
import qualified Streamly.Internal.Data.Fold as SF
import Streamly ( SerialT
, WSerialT
, AheadT
, AsyncT
, WAsyncT
, ParallelT
, MonadAsync
, IsStream
)
toStreamlyFoldM :: FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM (FL.FoldM step start done) = SF.mkFold step start done
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold (FL.Fold step start done) = SF.mkPure step start done
unpackStream :: S.IsStream t => MRC.Unpack x y -> t Identity x -> t Identity y
unpackStream (MRC.Filter t) = S.filter t
unpackStream (MRC.Unpack f) = S.concatMap (S.fromFoldable . f)
{-# INLINABLE unpackStream #-}
unpackStreamM :: (S.IsStream t, Monad m) => MRC.UnpackM m x y -> t m x -> t m y
unpackStreamM (MRC.FilterM t) = S.filterM t
unpackStreamM (MRC.UnpackM f) = S.concatMapM (fmap S.fromFoldable . f)
{-# INLINABLE unpackStreamM #-}
resultToList :: (Monad m, S.IsStream t) => t m a -> m [a]
resultToList = S.toList . S.adapt
concatStream :: (Monad m, Monoid a) => S.SerialT m a -> m a
concatStream = S.foldl' (<>) mempty
concatStreamFold :: Monoid b => FL.Fold a (S.SerialT Identity b) -> FL.Fold a b
concatStreamFold = fmap (runIdentity . concatStream)
concatStreamFoldM
:: (Monad m, Monoid b, S.IsStream t) => FL.FoldM m a (t m b) -> FL.FoldM m a b
concatStreamFoldM = MRC.postMapM (concatStream . S.adapt)
concatConcurrentStreamFold
:: (Monad m, Monoid b, S.IsStream t) => FL.Fold a (t m b) -> FL.FoldM m a b
concatConcurrentStreamFold = concatStreamFoldM . FL.generalize
streamlyEngine
:: (Foldable g, Functor g)
=> (forall z . S.SerialT Identity (k, z) -> S.SerialT Identity (k, g z))
-> MRE.MapReduceFold y k c (SerialT Identity) x d
streamlyEngine groupByKey u (MRC.Assign a) r = FL.Fold
(flip S.cons)
S.nil
( S.map (\(k, lc) -> MRE.reduceFunction r k lc)
. groupByKey
. S.map a
. unpackStream u
)
{-# INLINABLE streamlyEngine #-}
unpackConcurrently
:: (S.MonadAsync m, S.IsStream t) => MRC.Unpack x y -> t m x -> t m y
unpackConcurrently (MRC.Filter t) = S.filter t
unpackConcurrently (MRC.Unpack f) = S.concatMap (S.fromFoldable . f)
{-# INLINABLE unpackConcurrently #-}
concurrentStreamlyEngine
:: forall tIn tOut m g y k c x d
. (S.IsStream tIn, S.IsStream tOut, S.MonadAsync m, Foldable g, Functor g)
=> (forall z . S.SerialT m (k, z) -> S.SerialT m (k, g z))
-> MRE.MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine groupByKey u (MRC.Assign a) r = FL.Fold
(\s a' -> (return a') `S.consM` s)
S.nil
( S.mapM (\(k, lc) -> return $ MRE.reduceFunction r k lc)
. S.adapt @SerialT @tOut
. groupByKey
. S.adapt @tIn @SerialT
. S.mapM (return . a)
. (S.|$) (unpackConcurrently u)
)
{-# INLINABLE concurrentStreamlyEngine #-}
streamlyEngineM
:: (S.IsStream t, Monad m, S.MonadAsync m, Traversable g)
=> (forall z . SerialT m (k, z) -> SerialT m (k, g z))
-> MRE.MapReduceFoldM m y k c (t m) x d
streamlyEngineM groupByKey u (MRC.AssignM a) r =
FL.generalize
$ fmap S.adapt
$ FL.Fold
(flip S.cons)
S.nil
( S.mapM (\(k, lc) -> MRE.reduceFunctionM r k lc)
. groupByKey
. S.mapM a
. unpackStreamM u
)
{-# INLINABLE streamlyEngineM #-}
groupByHashableKey
:: (Monad m, Hashable k, Eq k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByHashableKey s = do
lkc <- S.yieldM (S.toList s)
let hm = HMS.fromListWith (<>) $ fmap (second $ Seq.singleton) lkc
HMS.foldrWithKey (\k lc s' -> S.cons (k, lc) s') S.nil hm
{-# INLINABLE groupByHashableKey #-}
groupByOrderedKey
:: (Monad m, Ord k) => S.SerialT m (k, c) -> S.SerialT m (k, Seq.Seq c)
groupByOrderedKey s = do
lkc <- S.yieldM (S.toList s)
let hm = MS.fromListWith (<>) $ fmap (second $ Seq.singleton) lkc
MS.foldrWithKey (\k lc s' -> S.cons (k, lc) s') S.nil hm
{-# INLINABLE groupByOrderedKey #-}
groupByHashableKeyST
:: (Monad m, Hashable k, Eq k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByHashableKeyST st = do
lkc <- S.yieldM (S.toList st)
ST.runST $ do
hm <- (MRE.fromListWithHT @HTC.HashTable) (<>)
$ fmap (second Seq.singleton) lkc
HT.foldM (\s' (k, sc) -> return $ S.cons (k, sc) s') S.nil hm
{-# INLINABLE groupByHashableKeyST #-}
groupByDiscriminatedKey
:: (Monad m, DG.Grouping k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByDiscriminatedKey s = do
lkc <- S.yieldM (S.toList s)
let g :: [(k, c)] -> (k, Seq.Seq c)
g x = let k = fst (head x) in (k, F.fold $ fmap (Seq.singleton . snd) x)
S.fromFoldable $ fmap g $ DG.groupWith fst lkc
{-# INLINABLE groupByDiscriminatedKey #-}