module Streamly.Internal.Data.Sink
(
Sink (..)
, toFold
, tee
, distribute
, demux
, unzipM
, unzip
, lmap
, lmapM
, lfilter
, lfilterM
, drain
, drainM
)
where
import Control.Monad (when, void)
import Data.Map.Strict (Map)
import Prelude
hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
foldl, map, mapM, sequence, all, any, sum, product, elem,
notElem, maximum, minimum, head, last, tail, length, null,
reverse, iterate, init, and, or, lookup, foldr1, (!!),
scanl, scanl1, replicate, concatMap, mconcat, foldMap, unzip)
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Sink.Types (Sink(..))
import qualified Data.Map.Strict as Map
toFold :: Monad m => Sink m a -> Fold m a ()
toFold :: Sink m a -> Fold m a ()
toFold (Sink a -> m ()
f) = (() -> a -> m ()) -> m () -> (() -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold () -> a -> m ()
forall p. p -> a -> m ()
step m ()
begin () -> m ()
forall (m :: * -> *) p. Monad m => p -> m ()
done
where
begin :: m ()
begin = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
step :: p -> a -> m ()
step p
_ = a -> m ()
f
done :: p -> m ()
done p
_ = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
tee :: Monad m => Sink m a -> Sink m a -> Sink m a
tee :: Sink m a -> Sink m a -> Sink m a
tee (Sink a -> m ()
fL) (Sink a -> m ()
fR) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> a -> m ()
fL a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m ()
fR a
a)
{-# INLINE distribute #-}
distribute :: Monad m => [Sink m a] -> Sink m a
distribute :: [Sink m a] -> Sink m a
distribute [Sink m a]
ss = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> (Sink m a -> m ()) -> [Sink m a] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (\(Sink a -> m ()
f) -> a -> m ()
f a
a) [Sink m a]
ss)
demux :: (Monad m, Ord k) => Map k (Sink m a) -> Sink m (a, k)
demux :: Map k (Sink m a) -> Sink m (a, k)
demux Map k (Sink m a)
kv = ((a, k) -> m ()) -> Sink m (a, k)
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (a, k) -> m ()
step
where
step :: (a, k) -> m ()
step (a
a, k
k) =
case k -> Map k (Sink m a) -> Maybe (Sink m a)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Sink m a)
kv of
Maybe (Sink m a)
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (Sink a -> m ()
g) -> a -> m ()
g a
a
{-# INLINE unzipM #-}
unzipM :: Monad m => (a -> m (b,c)) -> Sink m b -> Sink m c -> Sink m a
unzipM :: (a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzipM a -> m (b, c)
f (Sink b -> m ()
stepB) (Sink c -> m ()
stepC) =
(a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> a -> m (b, c)
f a
a m (b, c) -> ((b, c) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(b
b,c
c) -> b -> m ()
stepB b
b m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> c -> m ()
stepC c
c)
{-# INLINE unzip #-}
unzip :: Monad m => (a -> (b,c)) -> Sink m b -> Sink m c -> Sink m a
unzip :: (a -> (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzip a -> (b, c)
f = (a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
forall (m :: * -> *) a b c.
Monad m =>
(a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzipM ((b, c) -> m (b, c)
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, c) -> m (b, c)) -> (a -> (b, c)) -> a -> m (b, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> (b, c)
f)
{-# INLINABLE lmap #-}
lmap :: (a -> b) -> Sink m b -> Sink m a
lmap :: (a -> b) -> Sink m b -> Sink m a
lmap a -> b
f (Sink b -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (b -> m ()
step (b -> m ()) -> (a -> b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)
{-# INLINABLE lmapM #-}
lmapM :: Monad m => (a -> m b) -> Sink m b -> Sink m a
lmapM :: (a -> m b) -> Sink m b -> Sink m a
lmapM a -> m b
f (Sink b -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
x -> a -> m b
f a
x m b -> (b -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= b -> m ()
step)
{-# INLINABLE lfilter #-}
lfilter :: Monad m => (a -> Bool) -> Sink m a -> Sink m a
lfilter :: (a -> Bool) -> Sink m a -> Sink m a
lfilter a -> Bool
f (Sink a -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
f a
a) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
step a
a)
{-# INLINABLE lfilterM #-}
lfilterM :: Monad m => (a -> m Bool) -> Sink m a -> Sink m a
lfilterM :: (a -> m Bool) -> Sink m a -> Sink m a
lfilterM a -> m Bool
f (Sink a -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> a -> m Bool
f a
a m Bool -> (Bool -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
use -> Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
use (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
step a
a)
drain :: Monad m => Sink m a
drain :: Sink m a
drain = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINABLE drainM #-}
drainM :: Monad m => (a -> m b) -> Sink m a
drainM :: (a -> m b) -> Sink m a
drainM a -> m b
f = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> (a -> m b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)