module Streamly.Internal.Data.Stream.IsStream.Top
(
sampleFromThen
, sampleIntervalStart
, sampleIntervalEnd
, sampleBurstStart
, sampleBurstEnd
, sortBy
, intersectBy
, intersectBySorted
, differenceBy
, mergeDifferenceBy
, unionBy
, mergeUnionBy
, crossJoin
, joinInner
, joinInnerMap
, joinInnerMerge
, joinLeft
, mergeLeftJoin
, joinLeftMap
, joinOuter
, mergeOuterJoin
, joinOuterMap
)
where
#include "inline.hs"
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (get, put)
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), adapt, foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Internal.Data.Stream.IsStream.Lift as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as StreamD
import Prelude hiding (filter, zipWith, concatMap, concat)
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen :: Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
(t m a -> t m (Int, a))
-> (((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a))
-> ((Int, a) -> Bool)
-> t m a
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with t m a -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed ((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
(\(Int
i, a
_) -> Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
stride Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd :: Double -> t m a -> t m a
sampleIntervalEnd Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart :: Double -> t m a -> t m a
sampleIntervalStart Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd :: Double -> t m a -> t m a
sampleBurstEnd Double
gap =
let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
RelTime64
t2 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t1 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
in ((RelTime64, a) -> a) -> t m (RelTime64, a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (RelTime64, a) -> a
forall a b. (a, b) -> b
snd
(t m (RelTime64, a) -> t m a)
-> (t m a -> t m (RelTime64, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Maybe (RelTime64, a)) -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
(t m (Maybe (RelTime64, a)) -> t m (RelTime64, a))
-> (t m a -> t m (Maybe (RelTime64, a)))
-> t m a
-> t m (RelTime64, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((RelTime64, a) -> (RelTime64, a) -> Bool)
-> Fold m (RelTime64, a) (Maybe (RelTime64, a))
-> t m (RelTime64, a)
-> t m (Maybe (RelTime64, a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling (RelTime64, a) -> (RelTime64, a) -> Bool
forall b b. (RelTime64, b) -> (RelTime64, b) -> Bool
f Fold m (RelTime64, a) (Maybe (RelTime64, a))
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
(t m (RelTime64, a) -> t m (Maybe (RelTime64, a)))
-> (t m a -> t m (RelTime64, a))
-> t m a
-> t m (Maybe (RelTime64, a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart :: Double -> t m a -> t m a
sampleBurstStart Double
gap =
let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
RelTime64
t2 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t1 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
in ((RelTime64, a) -> a) -> t m (RelTime64, a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (RelTime64, a) -> a
forall a b. (a, b) -> b
snd
(t m (RelTime64, a) -> t m a)
-> (t m a -> t m (RelTime64, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Maybe (RelTime64, a)) -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
(t m (Maybe (RelTime64, a)) -> t m (RelTime64, a))
-> (t m a -> t m (Maybe (RelTime64, a)))
-> t m a
-> t m (RelTime64, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((RelTime64, a) -> (RelTime64, a) -> Bool)
-> Fold m (RelTime64, a) (Maybe (RelTime64, a))
-> t m (RelTime64, a)
-> t m (Maybe (RelTime64, a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling (RelTime64, a) -> (RelTime64, a) -> Bool
forall b b. (RelTime64, b) -> (RelTime64, b) -> Bool
f Fold m (RelTime64, a) (Maybe (RelTime64, a))
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
(t m (RelTime64, a) -> t m (Maybe (RelTime64, a)))
-> (t m a -> t m (RelTime64, a))
-> t m a
-> t m (Maybe (RelTime64, a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
{-# INLINE sortBy #-}
sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy :: (a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy a -> a -> Ordering
cmp =
let p :: Parser m a (Either (SerialT n a) (SerialT n a))
p =
(a -> a -> Bool)
-> Fold m a (SerialT n a)
-> Fold m a (SerialT n a)
-> Parser m a (Either (SerialT n a) (SerialT n a))
forall (m :: * -> *) a b c.
MonadCatch m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser m a (Either b c)
Parser.groupByRollingEither
(\a
x -> (Ordering -> Ordering -> Bool
forall a. Ord a => a -> a -> Bool
< Ordering
GT) (Ordering -> Bool) -> (a -> Ordering) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
Fold m a (SerialT n a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (SerialT n a)
Fold.toStreamRev
Fold m a (SerialT n a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (SerialT n a)
Fold.toStream
in (SerialT m a -> SerialT m a -> SerialT m a)
-> (SerialT m a -> SerialT m a)
-> SerialT m (SerialT m a)
-> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith ((a -> a -> Ordering) -> SerialT m a -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
cmp) SerialT m a -> SerialT m a
forall a. a -> a
id
(SerialT m (SerialT m a) -> SerialT m a)
-> (SerialT m a -> SerialT m (SerialT m a))
-> SerialT m a
-> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Parser m a (SerialT m a) -> SerialT m a -> SerialT m (SerialT m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadThrow m) =>
Parser m a b -> t m a -> t m b
Stream.parseMany ((Either (SerialT m a) (SerialT m a) -> SerialT m a)
-> Parser m a (Either (SerialT m a) (SerialT m a))
-> Parser m a (SerialT m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((SerialT m a -> SerialT m a)
-> (SerialT m a -> SerialT m a)
-> Either (SerialT m a) (SerialT m a)
-> SerialT m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SerialT m a -> SerialT m a
forall a. a -> a
id SerialT m a -> SerialT m a
forall a. a -> a
id) Parser m a (Either (SerialT m a) (SerialT m a))
forall (n :: * -> *) (n :: * -> *).
Parser m a (Either (SerialT n a) (SerialT n a))
p)
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
a
a <- t m a
s1
b
b <- t m b
s2
(a, b) -> t m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)
{-# INLINE joinInner #-}
joinInner ::
forall (t :: (Type -> Type) -> Type -> Type) m a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner a -> b -> Bool
eq t m a
s1 t m b
s2 = do
(a -> t m (a, b)) -> t m a -> t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\a
a ->
(b -> t m (a, b)) -> t m b -> t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\b
b ->
if a
a a -> b -> Bool
`eq` b
b
then (a, b) -> t m (a, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure (a
a, b
b)
else t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
) t m b
s2
) t m a
s1
toMap :: (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v)
toMap :: SerialT m (k, v) -> m (Map k v)
toMap = (Map k v -> (k, v) -> Map k v)
-> Map k v -> SerialT m (k, v) -> m (Map k v)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k v
kv (k
k, v
b) -> k -> v -> Map k v -> Map k v
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k v
b Map k v
kv) Map k v
forall k a. Map k a
Map.empty
{-# INLINE joinInnerMap #-}
joinInnerMap :: (IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap :: t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap t m (k, a)
s1 t m (k, b)
s2 =
m (t m (k, a, b)) -> t m (k, a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, a, b)) -> t m (k, a, b))
-> m (t m (k, a, b)) -> t m (k, a, b)
forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- SerialT m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
t m (k, a, b) -> m (t m (k, a, b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (t m (k, a, b) -> m (t m (k, a, b)))
-> t m (k, a, b) -> m (t m (k, a, b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> Maybe (k, a, b)) -> t m (k, a) -> t m (k, a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (Map k b -> (k, a) -> Maybe (k, a, b)
forall a c b. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
case a
k a -> Map a c -> Maybe c
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
Just c
b -> (a, b, c) -> Maybe (a, b, c)
forall a. a -> Maybe a
Just (a
k, b
a, c
b)
Maybe c
Nothing -> Maybe (a, b, c)
forall a. Maybe a
Nothing
{-# INLINE joinInnerMerge #-}
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge = (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
forall a. HasCallStack => a
undefined
{-# INLINE joinLeft #-}
joinLeft :: Monad m =>
(a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
joinLeft :: (a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
joinLeft a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s2 = m Bool
-> SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b))
-> SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
a
a <- SerialT m a -> SerialT (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
let final :: SerialT (StateT Bool m) (Maybe a)
final = do
Bool
r <- StateT Bool m Bool -> SerialT (StateT Bool m) Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
else Maybe a -> SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure Maybe a
forall a. Maybe a
Nothing
Maybe b
b <- (b -> Maybe b)
-> SerialT (StateT Bool m) b -> SerialT (StateT Bool m) (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> Maybe b
forall a. a -> Maybe a
Just (SerialT m b -> SerialT (StateT Bool m) b
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m b
s2) SerialT (StateT Bool m) (Maybe b)
-> SerialT (StateT Bool m) (Maybe b)
-> SerialT (StateT Bool m) (Maybe b)
forall a. Semigroup a => a -> a -> a
<> SerialT (StateT Bool m) (Maybe b)
forall a. SerialT (StateT Bool m) (Maybe a)
final
case Maybe b
b of
Just b
b1 ->
if a
a a -> b -> Bool
`eq` b
b1
then do
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
(a, Maybe b) -> SerialT (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
else SerialT (StateT Bool m) (a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
Maybe b
Nothing -> (a, Maybe b) -> SerialT (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Maybe b
forall a. Maybe a
Nothing)
{-# INLINE joinLeftMap #-}
joinLeftMap :: (IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap :: t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap t m (k, a)
s1 t m (k, b)
s2 =
m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b))
-> m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- SerialT m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
t m (k, a, Maybe b) -> m (t m (k, a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (k, a, Maybe b) -> m (t m (k, a, Maybe b)))
-> t m (k, a, Maybe b) -> m (t m (k, a, Maybe b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> (k, a, Maybe b)) -> t m (k, a) -> t m (k, a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (Map k b -> (k, a) -> (k, a, Maybe b)
forall a a b. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, b
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, b
a, Maybe a
forall a. Maybe a
Nothing)
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE joinOuter #-}
joinOuter :: MonadIO m =>
(a -> b -> Bool)
-> SerialT m a
-> SerialT m b
-> SerialT m (Maybe a, Maybe b)
joinOuter :: (a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b)
joinOuter a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s =
m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b))
-> m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Array (b, Bool)
arr <- SerialT m (b, Bool) -> m (Array (b, Bool))
forall (m :: * -> *) a. MonadIO m => SerialT m a -> m (Array a)
Array.fromStream (SerialT m (b, Bool) -> m (Array (b, Bool)))
-> SerialT m (b, Bool) -> m (Array (b, Bool))
forall a b. (a -> b) -> a -> b
$ (b -> (b, Bool)) -> SerialT m b -> SerialT m (b, Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,Bool
False) SerialT m b
s
SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b)))
-> SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ Array (b, Bool) -> SerialT m (Maybe a, Maybe b)
forall b. Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, Bool)
arr SerialT m (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b) -> SerialT m (Maybe a, Maybe b)
forall a. Semigroup a => a -> a -> a
<> Array (b, Bool) -> SerialT m (Maybe a, Maybe b)
forall a a. Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver Array (b, Bool)
arr
where
leftOver :: Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver =
((a, Bool) -> (Maybe a, Maybe a))
-> SerialT m (a, Bool) -> SerialT m (Maybe a, Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(a
x, Bool
_) -> (Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
x))
(SerialT m (a, Bool) -> SerialT m (Maybe a, Maybe a))
-> (Array (a, Bool) -> SerialT m (a, Bool))
-> Array (a, Bool)
-> SerialT m (Maybe a, Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a, Bool) -> Bool) -> SerialT m (a, Bool) -> SerialT m (a, Bool)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (Bool -> Bool
not (Bool -> Bool) -> ((a, Bool) -> Bool) -> (a, Bool) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, Bool) -> Bool
forall a b. (a, b) -> b
snd)
(SerialT m (a, Bool) -> SerialT m (a, Bool))
-> (Array (a, Bool) -> SerialT m (a, Bool))
-> Array (a, Bool)
-> SerialT m (a, Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Array (a, Bool) -> SerialT m (a, Bool)
forall (m :: * -> *) a. Monad m => Array a -> SerialT m a
Array.toStream
go :: Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, b)
arr = m Bool
-> SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b))
-> SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
a
a <- SerialT m a -> SerialT (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
let final :: SerialT (StateT Bool m) (Maybe a)
final = do
Bool
r <- StateT Bool m Bool -> SerialT (StateT Bool m) Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
else Maybe a -> SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure Maybe a
forall a. Maybe a
Nothing
(Int
_i, Maybe (b, b)
b) <-
let stream :: SerialT m (b, b)
stream = SerialT m (b, b) -> SerialT m (b, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
IsStream.fromSerial (SerialT m (b, b) -> SerialT m (b, b))
-> SerialT m (b, b) -> SerialT m (b, b)
forall a b. (a -> b) -> a -> b
$ Array (b, b) -> SerialT m (b, b)
forall (m :: * -> *) a. Monad m => Array a -> SerialT m a
Array.toStream Array (b, b)
arr
in SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed (SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b)))
-> SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b))
forall a b. (a -> b) -> a -> b
$ ((b, b) -> Maybe (b, b))
-> SerialT (StateT Bool m) (b, b)
-> SerialT (StateT Bool m) (Maybe (b, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, b) -> Maybe (b, b)
forall a. a -> Maybe a
Just (SerialT m (b, b) -> SerialT (StateT Bool m) (b, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m (b, b)
stream) SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Maybe (b, b))
forall a. Semigroup a => a -> a -> a
<> SerialT (StateT Bool m) (Maybe (b, b))
forall a. SerialT (StateT Bool m) (Maybe a)
final
case Maybe (b, b)
b of
Just (b
b1, b
_used) ->
if a
a a -> b -> Bool
`eq` b
b1
then do
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
(Maybe a, Maybe b) -> SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
else SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
Maybe (b, b)
Nothing -> (Maybe a, Maybe b) -> SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe b
forall a. Maybe a
Nothing)
{-# INLINE joinOuterMap #-}
joinOuterMap ::
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap :: t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap t m (k, a)
s1 t m (k, b)
s2 =
m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b))
-> m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Map k a
km1 <- SerialT m (k, a) -> m (Map k a)
forall a. SerialT m (k, a) -> m (Map k a)
kvFold (SerialT m (k, a) -> m (Map k a))
-> SerialT m (k, a) -> m (Map k a)
forall a b. (a -> b) -> a -> b
$ t m (k, a) -> SerialT m (k, a)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, a)
s1
Map k b
km2 <- SerialT m (k, b) -> m (Map k b)
forall a. SerialT m (k, a) -> m (Map k a)
kvFold (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
let res1 :: t m (k, Maybe a, Maybe b)
res1 = ((k, a) -> (k, Maybe a, Maybe b))
-> t m (k, a) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (Map k b -> (k, a) -> (k, Maybe a, Maybe b)
forall a a a. Ord a => Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2) (t m (k, a) -> t m (k, Maybe a, Maybe b))
-> t m (k, a) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, a)] -> t m (k, a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList ([(k, a)] -> t m (k, a)) -> [(k, a)] -> t m (k, a)
forall a b. (a -> b) -> a -> b
$ Map k a -> [(k, a)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
where
joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
forall a. Maybe a
Nothing)
let res2 :: t m (k, Maybe a, Maybe b)
res2 = ((k, b) -> Maybe (k, Maybe a, Maybe b))
-> t m (k, b) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (Map k a -> (k, b) -> Maybe (k, Maybe a, Maybe b)
forall a a a a.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1) (t m (k, b) -> t m (k, Maybe a, Maybe b))
-> t m (k, b) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, b)] -> t m (k, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList ([(k, b)] -> t m (k, b)) -> [(k, b)] -> t m (k, b)
forall a b. (a -> b) -> a -> b
$ Map k b -> [(k, b)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
where
joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
_ -> Maybe (a, Maybe a, Maybe a)
forall a. Maybe a
Nothing
Maybe a
Nothing -> (a, Maybe a, Maybe a) -> Maybe (a, Maybe a, Maybe a)
forall a. a -> Maybe a
Just (a
k, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b)))
-> t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ t m (k, Maybe a, Maybe b)
-> t m (k, Maybe a, Maybe b) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
Stream.serial t m (k, Maybe a, Maybe b)
res1 t m (k, Maybe a, Maybe b)
forall a. t m (k, Maybe a, Maybe b)
res2
where
kvFold :: SerialT m (k, a) -> m (Map k a)
kvFold = (Map k a -> (k, a) -> Map k a)
-> Map k a -> SerialT m (k, a) -> m (Map k a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k a
kv (k
k, a
b) -> k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) Map k a
forall k a. Map k a
Map.empty
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> (a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1
{-# INLINE intersectBySorted #-}
intersectBySorted :: (IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted a -> a -> Ordering
eq t m a
s1 =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
(Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
StreamD.intersectBySorted a -> a -> Ordering
eq (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
s1)
(Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s1
([a] -> t m a) -> m [a] -> m (t m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList (m [a] -> m (t m a)) -> m [a] -> m (t m a)
forall a b. (a -> b) -> a -> b
$ ([a] -> a -> [a]) -> [a] -> t m a -> m [a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: t m a
s3 = m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM a -> m a
forall (m :: * -> *). MonadIO m => a -> m a
f t m a
s1 t m a -> t m a -> t m a
forall a. Semigroup a => a -> a -> a
<> t m a
s3
{-# INLINE mergeUnionBy #-}
mergeUnionBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined