{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE Trustworthy #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Data.Conduit.Internal.Fusion
(
Step (..)
, Stream (..)
, ConduitWithStream
, StreamConduitT
, StreamConduit
, StreamSource
, StreamProducer
, StreamSink
, StreamConsumer
, streamConduit
, streamSource
, streamSourcePure
, unstream
) where
import Data.Conduit.Internal.Conduit
import Data.Conduit.Internal.Pipe (Pipe (..))
import Data.Functor.Identity (Identity (runIdentity))
import Data.Void (Void, absurd)
import Control.Monad.Trans.Resource (runResourceT)
data Step s o r
= Emit s o
| Skip s
| Stop r
deriving Functor
data Stream m o r = forall s. Stream
(s -> m (Step s o r))
(m s)
data ConduitWithStream i o m r = ConduitWithStream
(ConduitT i o m r)
(StreamConduitT i o m r)
type StreamConduitT i o m r = Stream m i () -> Stream m o r
type StreamConduit i m o = StreamConduitT i o m ()
type StreamSource m o = StreamConduitT () o m ()
type StreamProducer m o = forall i. StreamConduitT i o m ()
type StreamSink i m r = StreamConduitT i Void m r
type StreamConsumer i m r = forall o. StreamConduitT i o m r
unstream :: ConduitWithStream i o m r -> ConduitT i o m r
unstream (ConduitWithStream c _) = c
{-# INLINE [0] unstream #-}
fuseStream :: Monad m
=> ConduitWithStream a b m ()
-> ConduitWithStream b c m r
-> ConduitWithStream a c m r
fuseStream (ConduitWithStream a x) (ConduitWithStream b y) =
ConduitWithStream (a .| b) (y . x)
{-# INLINE fuseStream #-}
{-# RULES "conduit: fuseStream (.|)" forall left right.
unstream left .| unstream right = unstream (fuseStream left right)
#-}
{-# RULES "conduit: fuseStream (fuse)" forall left right.
fuse (unstream left) (unstream right) = unstream (fuseStream left right)
#-}
{-# RULES "conduit: fuseStream (=$=)" forall left right.
unstream left =$= unstream right = unstream (fuseStream left right)
#-}
runStream :: Monad m
=> ConduitWithStream () Void m r
-> m r
runStream (ConduitWithStream _ f) =
run $ f $ Stream emptyStep (return ())
where
emptyStep _ = return $ Stop ()
run (Stream step ms0) =
ms0 >>= loop
where
loop s = do
res <- step s
case res of
Stop r -> return r
Skip s' -> loop s'
Emit _ o -> absurd o
{-# INLINE runStream #-}
{-# RULES "conduit: runStream" forall stream.
runConduit (unstream stream) = runStream stream
#-}
{-# RULES "conduit: runStream (pure)" forall stream.
runConduitPure (unstream stream) = runIdentity (runStream stream)
#-}
{-# RULES "conduit: runStream (ResourceT)" forall stream.
runConduitRes (unstream stream) = runResourceT (runStream stream)
#-}
connectStream :: Monad m
=> ConduitWithStream () i m ()
-> ConduitWithStream i Void m r
-> m r
connectStream (ConduitWithStream _ stream) (ConduitWithStream _ f) =
run $ f $ stream $ Stream emptyStep (return ())
where
emptyStep _ = return $ Stop ()
run (Stream step ms0) =
ms0 >>= loop
where
loop s = do
res <- step s
case res of
Stop r -> return r
Skip s' -> loop s'
Emit _ o -> absurd o
{-# INLINE connectStream #-}
{-# RULES "conduit: connectStream ($$)" forall left right.
unstream left $$ unstream right = connectStream left right
#-}
connectStream1 :: Monad m
=> ConduitWithStream () i m ()
-> ConduitT i Void m r
-> m r
connectStream1 (ConduitWithStream _ fstream) (ConduitT sink0) =
case fstream $ Stream (const $ return $ Stop ()) (return ()) of
Stream step ms0 ->
let loop _ (Done r) _ = return r
loop ls (PipeM mp) s = mp >>= flip (loop ls) s
loop ls (Leftover p l) s = loop (l:ls) p s
loop _ (HaveOutput _ o) _ = absurd o
loop (l:ls) (NeedInput p _) s = loop ls (p l) s
loop [] (NeedInput p c) s = do
res <- step s
case res of
Stop () -> loop [] (c ()) s
Skip s' -> loop [] (NeedInput p c) s'
Emit s' i -> loop [] (p i) s'
in ms0 >>= loop [] (sink0 Done)
{-# INLINE connectStream1 #-}
{-# RULES "conduit: connectStream1 ($$)" forall left right.
unstream left $$ right = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduit/.|)" forall left right.
runConduit (unstream left .| right) = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduit/=$=)" forall left right.
runConduit (unstream left =$= right) = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduit/fuse)" forall left right.
runConduit (fuse (unstream left) right) = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduitPure/.|)" forall left right.
runConduitPure (unstream left .| right) = runIdentity (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitPure/=$=)" forall left right.
runConduitPure (unstream left =$= right) = runIdentity (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitPure/fuse)" forall left right.
runConduitPure (fuse (unstream left) right) = runIdentity (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitRes/.|)" forall left right.
runConduitRes (unstream left .| right) = runResourceT (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitRes/=$=)" forall left right.
runConduitRes (unstream left =$= right) = runResourceT (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitRes/fuse)" forall left right.
runConduitRes (fuse (unstream left) right) = runResourceT (connectStream1 left right)
#-}
connectStream2 :: forall i m r. Monad m
=> ConduitT () i m ()
-> ConduitWithStream i Void m r
-> m r
connectStream2 (ConduitT src0) (ConduitWithStream _ fstream) =
run $ fstream $ Stream step' $ return (src0 Done)
where
step' :: Pipe () () i () m () -> m (Step (Pipe () () i () m ()) i ())
step' (Done ()) = return $ Stop ()
step' (HaveOutput pipe o) = return $ Emit pipe o
step' (NeedInput _ c) = return $ Skip $ c ()
step' (PipeM mp) = Skip <$> mp
step' (Leftover p ()) = return $ Skip p
{-# INLINE step' #-}
run (Stream step ms0) =
ms0 >>= loop
where
loop s = do
res <- step s
case res of
Stop r -> return r
Emit _ o -> absurd o
Skip s' -> loop s'
{-# INLINE connectStream2 #-}
{-# RULES "conduit: connectStream2 ($$)" forall left right.
left $$ unstream right = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduit/.|)" forall left right.
runConduit (left .| unstream right) = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduit/fuse)" forall left right.
runConduit (fuse left (unstream right)) = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduit/=$=)" forall left right.
runConduit (left =$= unstream right) = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduitPure/.|)" forall left right.
runConduitPure (left .| unstream right) = runIdentity (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitPure/fuse)" forall left right.
runConduitPure (fuse left (unstream right)) = runIdentity (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitPure/=$=)" forall left right.
runConduitPure (left =$= unstream right) = runIdentity (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitRes/.|)" forall left right.
runConduitRes (left .| unstream right) = runResourceT (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitRes/fuse)" forall left right.
runConduitRes (fuse left (unstream right)) = runResourceT (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitRes/=$=)" forall left right.
runConduitRes (left =$= unstream right) = runResourceT (connectStream2 left right)
#-}
streamConduit :: ConduitT i o m r
-> (Stream m i () -> Stream m o r)
-> ConduitWithStream i o m r
streamConduit = ConduitWithStream
{-# INLINE CONLIKE streamConduit #-}
streamSource
:: Monad m
=> Stream m o ()
-> ConduitWithStream i o m ()
streamSource str@(Stream step ms0) =
ConduitWithStream con (const str)
where
con = ConduitT $ \rest -> PipeM $ do
s0 <- ms0
let loop s = do
res <- step s
case res of
Stop () -> return $ rest ()
Emit s' o -> return $ HaveOutput (PipeM $ loop s') o
Skip s' -> loop s'
loop s0
{-# INLINE streamSource #-}
streamSourcePure
:: Monad m
=> Stream Identity o ()
-> ConduitWithStream i o m ()
streamSourcePure (Stream step ms0) =
ConduitWithStream con (const $ Stream (return . runIdentity . step) (return s0))
where
s0 = runIdentity ms0
con = ConduitT $ \rest ->
let loop s =
case runIdentity $ step s of
Stop () -> rest ()
Emit s' o -> HaveOutput (loop s') o
Skip s' -> loop s'
in loop s0
{-# INLINE streamSourcePure #-}