{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
module Streamly.Internal.Data.Fold.Types
( Fold (..)
, Fold2 (..)
, simplify
, toListRevF
, lmap
, lmapM
, lfilter
, lfilterM
, lcatMaybes
, ltake
, ltakeWhile
, lsessionsOf
, lchunksOf
, lchunksOf2
, duplicate
, initialize
, runStep
)
where
import Control.Applicative (liftA2)
import Control.Concurrent (threadDelay, forkIO, killThread)
import Control.Concurrent.MVar (MVar, newMVar, takeMVar, putMVar)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad (void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (control)
import Data.Maybe (isJust, fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Data.Strict (Tuple'(..), Tuple3'(..), Either'(..))
import Streamly.Internal.Data.SVar (MonadAsync)
data Fold m a b =
forall s. Fold (s -> a -> m s) (m s) (s -> m b)
data Fold2 m c a b =
forall s. Fold2 (s -> a -> m s) (c -> m s) (s -> m b)
simplify :: Fold2 m c a b -> c -> Fold m a b
simplify (Fold2 step inject extract) c = Fold step (inject c) extract
instance Applicative m => Functor (Fold m a) where
{-# INLINE fmap #-}
fmap f (Fold step start done) = Fold step start done'
where
done' x = fmap f $! done x
{-# INLINE (<$) #-}
(<$) b = \_ -> pure b
instance Applicative m => Applicative (Fold m a) where
{-# INLINE pure #-}
pure b = Fold (\() _ -> pure ()) (pure ()) (\() -> pure b)
{-# INLINE (<*>) #-}
(Fold stepL beginL doneL) <*> (Fold stepR beginR doneR) =
let step (Tuple' xL xR) a = Tuple' <$> stepL xL a <*> stepR xR a
begin = Tuple' <$> beginL <*> beginR
done (Tuple' xL xR) = doneL xL <*> doneR xR
in Fold step begin done
{-# INLINE (<*) #-}
(<*) m = \_ -> m
{-# INLINE (*>) #-}
_ *> m = m
instance (Semigroup b, Monad m) => Semigroup (Fold m a b) where
{-# INLINE (<>) #-}
(<>) = liftA2 (<>)
instance (Semigroup b, Monoid b, Monad m) => Monoid (Fold m a b) where
{-# INLINE mempty #-}
mempty = pure mempty
{-# INLINE mappend #-}
mappend = (<>)
instance (Monad m, Num b) => Num (Fold m a b) where
{-# INLINE fromInteger #-}
fromInteger = pure . fromInteger
{-# INLINE negate #-}
negate = fmap negate
{-# INLINE abs #-}
abs = fmap abs
{-# INLINE signum #-}
signum = fmap signum
{-# INLINE (+) #-}
(+) = liftA2 (+)
{-# INLINE (*) #-}
(*) = liftA2 (*)
{-# INLINE (-) #-}
(-) = liftA2 (-)
instance (Monad m, Fractional b) => Fractional (Fold m a b) where
{-# INLINE fromRational #-}
fromRational = pure . fromRational
{-# INLINE recip #-}
recip = fmap recip
{-# INLINE (/) #-}
(/) = liftA2 (/)
instance (Monad m, Floating b) => Floating (Fold m a b) where
{-# INLINE pi #-}
pi = pure pi
{-# INLINE exp #-}
exp = fmap exp
{-# INLINE sqrt #-}
sqrt = fmap sqrt
{-# INLINE log #-}
log = fmap log
{-# INLINE sin #-}
sin = fmap sin
{-# INLINE tan #-}
tan = fmap tan
{-# INLINE cos #-}
cos = fmap cos
{-# INLINE asin #-}
asin = fmap asin
{-# INLINE atan #-}
atan = fmap atan
{-# INLINE acos #-}
acos = fmap acos
{-# INLINE sinh #-}
sinh = fmap sinh
{-# INLINE tanh #-}
tanh = fmap tanh
{-# INLINE cosh #-}
cosh = fmap cosh
{-# INLINE asinh #-}
asinh = fmap asinh
{-# INLINE atanh #-}
atanh = fmap atanh
{-# INLINE acosh #-}
acosh = fmap acosh
{-# INLINE (**) #-}
(**) = liftA2 (**)
{-# INLINE logBase #-}
logBase = liftA2 logBase
{-# INLINABLE toListRevF #-}
toListRevF :: Monad m => Fold m a [a]
toListRevF = Fold (\xs x -> return $ x:xs) (return []) return
{-# INLINABLE lmap #-}
lmap :: (a -> b) -> Fold m b r -> Fold m a r
lmap f (Fold step begin done) = Fold step' begin done
where
step' x a = step x (f a)
{-# INLINABLE lmapM #-}
lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
lmapM f (Fold step begin done) = Fold step' begin done
where
step' x a = f a >>= step x
{-# INLINABLE lfilter #-}
lfilter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r
lfilter f (Fold step begin done) = Fold step' begin done
where
step' x a = if f a then step x a else return x
{-# INLINABLE lfilterM #-}
lfilterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r
lfilterM f (Fold step begin done) = Fold step' begin done
where
step' x a = do
use <- f a
if use then step x a else return x
{-# INLINE lcatMaybes #-}
lcatMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
lcatMaybes = lfilter isJust . lmap fromJust
{-# INLINABLE ltake #-}
ltake :: Monad m => Int -> Fold m a b -> Fold m a b
ltake n (Fold step initial done) = Fold step' initial' done'
where
initial' = fmap (Tuple' 0) initial
step' (Tuple' i r) a = do
if i < n
then do
res <- step r a
return $ Tuple' (i + 1) res
else return $ Tuple' i r
done' (Tuple' _ r) = done r
{-# INLINABLE ltakeWhile #-}
ltakeWhile :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b
ltakeWhile predicate (Fold step initial done) = Fold step' initial' done'
where
initial' = fmap Left' initial
step' (Left' r) a = do
if predicate a
then fmap Left' $ step r a
else return (Right' r)
step' r _ = return r
done' (Left' r) = done r
done' (Right' r) = done r
{-# INLINABLE duplicate #-}
duplicate :: Applicative m => Fold m a b -> Fold m a (Fold m a b)
duplicate (Fold step begin done) =
Fold step begin (\x -> pure (Fold step (pure x) done))
{-# INLINABLE initialize #-}
initialize :: Monad m => Fold m a b -> m (Fold m a b)
initialize (Fold step initial extract) = do
i <- initial
return $ Fold step (return i) extract
{-# INLINABLE runStep #-}
runStep :: Monad m => Fold m a b -> a -> m (Fold m a b)
runStep (Fold step initial extract) a = do
i <- initial
r <- step i a
return $ (Fold step (return r) extract)
{-# INLINE lchunksOf #-}
lchunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
lchunksOf n (Fold step1 initial1 extract1) (Fold step2 initial2 extract2) =
Fold step' initial' extract'
where
initial' = (Tuple3' 0) <$> initial1 <*> initial2
step' (Tuple3' i r1 r2) a = do
if i < n
then do
res <- step1 r1 a
return $ Tuple3' (i + 1) res r2
else do
res <- extract1 r1
acc2 <- step2 r2 res
i1 <- initial1
acc1 <- step1 i1 a
return $ Tuple3' 1 acc1 acc2
extract' (Tuple3' _ r1 r2) = do
res <- extract1 r1
acc2 <- step2 r2 res
extract2 acc2
{-# INLINE lchunksOf2 #-}
lchunksOf2 :: Monad m => Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c
lchunksOf2 n (Fold step1 initial1 extract1) (Fold2 step2 inject2 extract2) =
Fold2 step' inject' extract'
where
inject' x = (Tuple3' 0) <$> initial1 <*> inject2 x
step' (Tuple3' i r1 r2) a = do
if i < n
then do
res <- step1 r1 a
return $ Tuple3' (i + 1) res r2
else do
res <- extract1 r1
acc2 <- step2 r2 res
i1 <- initial1
acc1 <- step1 i1 a
return $ Tuple3' 1 acc1 acc2
extract' (Tuple3' _ r1 r2) = do
res <- extract1 r1
acc2 <- step2 r2 res
extract2 acc2
{-# INLINE lsessionsOf #-}
lsessionsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
lsessionsOf n (Fold step1 initial1 extract1) (Fold step2 initial2 extract2) =
Fold step' initial' extract'
where
initial' = do
i1 <- initial1
i2 <- initial2
mv1 <- liftIO $ newMVar i1
mv2 <- liftIO $ newMVar (Right i2)
t <- control $ \run ->
mask $ \restore -> do
tid <- forkIO $ catch (restore $ void $ run (timerThread mv1 mv2))
(handleChildException mv2)
run (return tid)
return $ Tuple3' t mv1 mv2
step' acc@(Tuple3' _ mv1 _) a = do
r1 <- liftIO $ takeMVar mv1
res <- step1 r1 a
liftIO $ putMVar mv1 res
return acc
extract' (Tuple3' tid _ mv2) = do
r2 <- liftIO $ takeMVar mv2
liftIO $ killThread tid
case r2 of
Left e -> throwM e
Right x -> extract2 x
timerThread mv1 mv2 = do
liftIO $ threadDelay (round $ n * 1000000)
r1 <- liftIO $ takeMVar mv1
i1 <- initial1
liftIO $ putMVar mv1 i1
res1 <- extract1 r1
r2 <- liftIO $ takeMVar mv2
res <- case r2 of
Left _ -> return r2
Right x -> fmap Right $ step2 x res1
liftIO $ putMVar mv2 res
timerThread mv1 mv2
handleChildException ::
MVar (Either SomeException a) -> SomeException -> IO ()
handleChildException mv2 e = do
r2 <- takeMVar mv2
let r = case r2 of
Left _ -> r2
Right _ -> Left e
putMVar mv2 r