{-# LANGUAGE CPP #-}
module Streamly.Internal.Data.Stream.StreamD.Container
(
nub
, joinLeftGeneric
, joinOuterGeneric
, joinInner
, joinLeft
, joinOuter
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.State.Strict (get, put)
import Data.Function ((&))
import Data.Maybe (isJust)
import Streamly.Internal.Data.Stream.StreamD.Step (Step(..))
import Streamly.Internal.Data.Stream.StreamD.Type
(Stream(..), mkCross, unCross)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Array.Generic as Array
import qualified Streamly.Internal.Data.Array.Mut.Type as MA
import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transform as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transformer as Stream
#include "DocTestDataStream.hs"
{-# INLINE_NORMAL nub #-}
nub :: (Monad m, Ord a) => Stream m a -> Stream m a
nub :: forall (m :: * -> *) a.
(Monad m, Ord a) =>
Stream m a -> Stream m a
nub (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step (forall a. Set a
Set.empty, s
state1)
where
step :: State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step State StreamK m a
gst (Set a
set, s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
forall (m :: * -> *) a. Monad m => a -> m a
return
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
x s
s ->
if forall a. Ord a => a -> Set a -> Bool
Set.member a
x Set a
set
then forall s a. s -> Step s a
Skip (Set a
set, s
s)
else forall s a. a -> s -> Step s a
Yield a
x (forall a. Ord a => a -> Set a -> Set a
Set.insert a
x Set a
set, s
s)
Skip s
s -> forall s a. s -> Step s a
Skip (Set a
set, s
s)
Step s a
Stop -> forall s a. Step s a
Stop
toMap :: (Monad m, Ord k) => Stream m (k, v) -> m (Map.Map k v)
toMap :: forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap =
let f :: Fold m (k, a) (Map k a)
f = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) forall k a. Map k a
Map.empty
in forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall {a}. Fold m (k, a) (Map k a)
f
{-# INLINE joinInner #-}
joinInner :: (Monad m, Ord k) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner Stream m (k, a)
s1 Stream m (k, b)
s2 =
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (forall {a} {c} {b}. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) Stream m (k, a)
s1
where
joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
Just c
b -> forall a. a -> Maybe a
Just (a
k, b
a, c
b)
Maybe c
Nothing -> forall a. Maybe a
Nothing
{-# INLINE joinLeftGeneric #-}
joinLeftGeneric :: Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross forall a b. (a -> b) -> a -> b
$ do
a
a <- forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
let final :: Stream (StateT Bool m) (Maybe a)
final = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
Bool
r <- forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
else forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure forall a. Maybe a
Nothing)
Maybe b
b <- forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
s2) forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` forall {a}. Stream (StateT Bool m) (Maybe a)
final)
case Maybe b
b of
Just b
b1 ->
if a
a a -> b -> Bool
`eq` b
b1
then do
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. a -> Maybe a
Just b
b1)
else forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
Maybe b
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Maybe a
Nothing)
{-# INLINE joinLeft #-}
joinLeft :: (Ord k, Monad m) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft :: forall k (m :: * -> *) a b.
(Ord k, Monad m) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft Stream m (k, a)
s1 Stream m (k, b)
s2 =
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall {a} {a} {b}. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) Stream m (k, a)
s1
where
joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, b
a, forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, b
a, forall a. Maybe a
Nothing)
{-# INLINE joinOuterGeneric #-}
joinOuterGeneric :: MonadIO m =>
(a -> b -> Bool)
-> Stream m a
-> Stream m b
-> Stream m (Maybe a, Maybe b)
joinOuterGeneric :: forall (m :: * -> *) a b.
MonadIO m =>
(a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
joinOuterGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s =
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
Array b
inputArr <- forall (m :: * -> *) a. MonadIO m => Stream m a -> m (Array a)
Array.fromStream Stream m b
s
let len :: Int
len = forall a. Array a -> Int
Array.length Array b
inputArr
MutArray Bool
foundArr <-
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold
(forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (MutArray a)
MA.writeN Int
len)
(forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList (forall a. Int -> a -> [a]
Prelude.replicate Int
len Bool
False))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` forall {m :: * -> *} {a} {a}.
MonadIO m =>
Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array b
inputArr MutArray Bool
foundArr
where
leftOver :: Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array a
inputArr MutArray Bool
foundArr =
let stream1 :: Stream m a
stream1 = forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array a
inputArr
stream2 :: Stream m Bool
stream2 = forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Unfold m (MutArray a) a
MA.reader MutArray Bool
foundArr
in forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter
forall a. Maybe a -> Bool
isJust
( forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWith (\a
x Bool
y ->
if Bool
y
then forall a. Maybe a
Nothing
else forall a. a -> Maybe a
Just (forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
x)
) Stream m a
stream1 Stream m Bool
stream2
) forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
evalState :: CrossStream (StateT Bool m) a -> Stream m a
evalState = forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross
go :: Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr = forall {a}. CrossStream (StateT Bool m) a -> Stream m a
evalState forall a b. (a -> b) -> a -> b
$ do
a
a <- forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
let final :: Stream (StateT Bool m) (Maybe a)
final = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
Bool
r <- forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
else forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure forall a. Maybe a
Nothing)
(Int
i, Maybe b
b) <-
let stream :: Stream m b
stream = forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array b
inputArr
in forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross
(forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
Stream.indexed forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
stream) forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` forall {a}. Stream (StateT Bool m) (Maybe a)
final)
case Maybe b
b of
Just b
b1 ->
if a
a a -> b -> Bool
`eq` b
b1
then do
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> a -> m ()
MA.putIndex Int
i MutArray Bool
foundArr Bool
True
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just b
b1)
else forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
Maybe b
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)
{-# INLINE joinOuter #-}
joinOuter ::
(Ord k, MonadIO m) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter :: forall k (m :: * -> *) a b.
(Ord k, MonadIO m) =>
Stream m (k, a)
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter Stream m (k, a)
s1 Stream m (k, b)
s2 =
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect forall a b. (a -> b) -> a -> b
$ do
Map k a
km1 <- forall {a}. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, a)
s1
Map k b
km2 <- forall {a}. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, b)
s2
let res1 :: Stream m (k, Maybe a, Maybe b)
res1 = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2)
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
where
joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)
let res2 :: Stream m (k, Maybe a, Maybe b)
res2 = forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (forall {a} {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1)
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
where
joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
_ -> forall a. Maybe a
Nothing
Maybe a
Nothing -> forall a. a -> Maybe a
Just (a
k, forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
b)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
Stream.append Stream m (k, Maybe a, Maybe b)
res1 forall {a}. Stream m (k, Maybe a, Maybe b)
res2
where
kvFold :: Stream m (k, a) -> m (Map k a)
kvFold =
let f :: Fold m (k, a) (Map k a)
f = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) forall k a. Map k a
Map.empty
in forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall {a}. Fold m (k, a) (Map k a)
f