module Data.Conduit.Internal
(
Pipe (..)
, ConduitM (..)
, Source
, Producer
, Sink
, Consumer
, Conduit
, ResumableSource (..)
, ResumableConduit (..)
, await
, awaitE
, awaitForever
, yield
, yieldM
, yieldOr
, leftover
, bracketP
, addCleanup
, idP
, pipe
, pipeL
, connectResume
, connectResumeConduit
, runPipe
, injectLeftovers
, (>+>)
, (<+<)
, fuseLeftovers
, fuseReturnLeftovers
, sourceToPipe
, sinkToPipe
, conduitToPipe
, toProducer
, toConsumer
, catchP
, handleP
, tryP
, catchC
, handleC
, tryC
, transPipe
, mapOutput
, mapOutputMaybe
, mapInput
, sourceList
, withUpstream
, unwrapResumable
, unwrapResumableConduit
, newResumableSource
, newResumableConduit
, Data.Conduit.Internal.enumFromTo
, zipSinks
, zipSources
, zipSourcesApp
, zipConduitApp
, passthroughSink
, generalizeUpstream
) where
import Control.Applicative (Applicative (..))
import Control.Exception.Lifted as E (Exception, catch)
import Control.Monad ((>=>), liftM, ap, when, liftM2)
import Control.Monad.Error.Class(MonadError(..))
import Control.Monad.Reader.Class(MonadReader(..))
import Control.Monad.RWS.Class(MonadRWS())
import Control.Monad.Writer.Class(MonadWriter(..))
import Control.Monad.State.Class(MonadState(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Base (MonadBase (liftBase))
import Data.Void (Void, absurd)
import Data.Monoid (Monoid (mappend, mempty))
import Control.Monad.Trans.Resource
import qualified GHC.Exts
import qualified Data.IORef as I
import Control.Monad.Morph (MFunctor (..))
#if MIN_VERSION_exceptions(0, 6, 0)
import qualified Control.Monad.Catch as Catch
#endif
data Pipe l i o u m r =
HaveOutput (Pipe l i o u m r) (m ()) o
| NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
| Done r
| PipeM (m (Pipe l i o u m r))
| Leftover (Pipe l i o u m r) l
instance Monad m => Functor (Pipe l i o u m) where
fmap = liftM
instance Monad m => Applicative (Pipe l i o u m) where
pure = return
(<*>) = ap
instance Monad m => Monad (Pipe l i o u m) where
return = Done
HaveOutput p c o >>= fp = HaveOutput (p >>= fp) c o
NeedInput p c >>= fp = NeedInput (p >=> fp) (c >=> fp)
Done x >>= fp = fp x
PipeM mp >>= fp = PipeM ((>>= fp) `liftM` mp)
Leftover p i >>= fp = Leftover (p >>= fp) i
instance MonadBase base m => MonadBase base (Pipe l i o u m) where
liftBase = lift . liftBase
instance MonadTrans (Pipe l i o u) where
lift mr = PipeM (Done `liftM` mr)
instance MonadIO m => MonadIO (Pipe l i o u m) where
liftIO = lift . liftIO
instance MonadThrow m => MonadThrow (Pipe l i o u m) where
throwM = lift . throwM
#if MIN_VERSION_exceptions(0, 6, 0)
instance Catch.MonadCatch m => Catch.MonadCatch (Pipe l i o u m) where
catch p0 onErr =
go p0
where
go (Done r) = Done r
go (PipeM mp) = PipeM $ Catch.catch (liftM go mp) (return . onErr)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p c o) = HaveOutput (go p) c o
#endif
instance Monad m => Monoid (Pipe l i o u m ()) where
mempty = return ()
mappend = (>>)
instance MonadResource m => MonadResource (Pipe l i o u m) where
liftResourceT = lift . liftResourceT
instance MonadReader r m => MonadReader r (Pipe l i o u m) where
ask = lift ask
local f (HaveOutput p c o) = HaveOutput (local f p) c o
local f (NeedInput p c) = NeedInput (\i -> local f (p i)) (\u -> local f (c u))
local _ (Done x) = Done x
local f (PipeM mp) = PipeM (local f mp)
local f (Leftover p i) = Leftover (local f p) i
#ifndef MIN_VERSION_mtl
#define MIN_VERSION_mtl(x, y, z) 0
#endif
instance MonadWriter w m => MonadWriter w (Pipe l i o u m) where
#if MIN_VERSION_mtl(2, 1, 0)
writer = lift . writer
#endif
tell = lift . tell
listen (HaveOutput p c o) = HaveOutput (listen p) c o
listen (NeedInput p c) = NeedInput (\i -> listen (p i)) (\u -> listen (c u))
listen (Done x) = Done (x,mempty)
listen (PipeM mp) =
PipeM $
do (p,w) <- listen mp
return $ do (x,w') <- listen p
return (x, w `mappend` w')
listen (Leftover p i) = Leftover (listen p) i
pass (HaveOutput p c o) = HaveOutput (pass p) c o
pass (NeedInput p c) = NeedInput (\i -> pass (p i)) (\u -> pass (c u))
pass (PipeM mp) = PipeM $ mp >>= (return . pass)
pass (Done (x,_)) = Done x
pass (Leftover p i) = Leftover (pass p) i
instance MonadState s m => MonadState s (Pipe l i o u m) where
get = lift get
put = lift . put
#if MIN_VERSION_mtl(2, 1, 0)
state = lift . state
#endif
instance MonadRWS r w s m => MonadRWS r w s (Pipe l i o u m)
instance MonadError e m => MonadError e (Pipe l i o u m) where
throwError = lift . throwError
catchError (HaveOutput p c o) f = HaveOutput (catchError p f) c o
catchError (NeedInput p c) f = NeedInput (\i -> catchError (p i) f) (\u -> catchError (c u) f)
catchError (Done x) _ = Done x
catchError (PipeM mp) f =
PipeM $ catchError (liftM (flip catchError f) mp) (\e -> return (f e))
catchError (Leftover p i) f = Leftover (catchError p f) i
newtype ConduitM i o m r = ConduitM { unConduitM :: Pipe i i o () m r }
deriving (Functor, Applicative, Monad, MonadIO, MonadTrans, MonadThrow, MFunctor
#if MIN_VERSION_exceptions(0, 6, 0)
, Catch.MonadCatch
#endif
)
instance MonadReader r m => MonadReader r (ConduitM i o m) where
ask = ConduitM ask
local f (ConduitM m) = ConduitM (local f m)
instance MonadWriter w m => MonadWriter w (ConduitM i o m) where
#if MIN_VERSION_mtl(2, 1, 0)
writer = ConduitM . writer
#endif
tell = ConduitM . tell
listen (ConduitM m) = ConduitM (listen m)
pass (ConduitM m) = ConduitM (pass m)
instance MonadState s m => MonadState s (ConduitM i o m) where
get = ConduitM get
put = ConduitM . put
#if MIN_VERSION_mtl(2, 1, 0)
state = ConduitM . state
#endif
instance MonadRWS r w s m => MonadRWS r w s (ConduitM i o m)
instance MonadError e m => MonadError e (ConduitM i o m) where
throwError = ConduitM . throwError
catchError (ConduitM m) f = ConduitM $ catchError m (unConduitM . f)
instance MonadBase base m => MonadBase base (ConduitM i o m) where
liftBase = lift . liftBase
instance MonadResource m => MonadResource (ConduitM i o m) where
liftResourceT = lift . liftResourceT
instance Monad m => Monoid (ConduitM i o m ()) where
mempty = return ()
mappend = (>>)
type Source m o = ConduitM () o m ()
type Producer m o = forall i. ConduitM i o m ()
type Sink i = ConduitM i Void
type Consumer i m r = forall o. ConduitM i o m r
type Conduit i m o = ConduitM i o m ()
data ResumableSource m o = ResumableSource (Source m o) (m ())
instance MFunctor ResumableSource where
hoist nat (ResumableSource src m) = ResumableSource (hoist nat src) (nat m)
await :: Pipe l i o u m (Maybe i)
await = NeedInput (Done . Just) (\_ -> Done Nothing)
awaitE :: Pipe l i o u m (Either u i)
awaitE = NeedInput (Done . Right) (Done . Left)
awaitForever :: Monad m => (i -> Pipe l i o r m r') -> Pipe l i o r m r
awaitForever inner =
self
where
self = awaitE >>= either return (\i -> inner i >> self)
yield :: Monad m
=> o
-> Pipe l i o u m ()
yield = HaveOutput (Done ()) (return ())
yieldM :: Monad m => m o -> Pipe l i o u m ()
yieldM = PipeM . liftM (HaveOutput (Done ()) (return ()))
yieldOr :: Monad m
=> o
-> m ()
-> Pipe l i o u m ()
yieldOr o f = HaveOutput (Done ()) f o
leftover :: l -> Pipe l i o u m ()
leftover = Leftover (Done ())
bracketP :: MonadResource m
=> IO a
-> (a -> IO ())
-> (a -> Pipe l i o u m r)
-> Pipe l i o u m r
bracketP alloc free inside =
PipeM start
where
start = do
(key, seed) <- allocate alloc free
return $ addCleanup (const $ release key) (inside seed)
addCleanup :: Monad m
=> (Bool -> m ())
-> Pipe l i o u m r
-> Pipe l i o u m r
addCleanup cleanup (Done r) = PipeM (cleanup True >> return (Done r))
addCleanup cleanup (HaveOutput src close x) = HaveOutput
(addCleanup cleanup src)
(cleanup False >> close)
x
addCleanup cleanup (PipeM msrc) = PipeM (liftM (addCleanup cleanup) msrc)
addCleanup cleanup (NeedInput p c) = NeedInput
(addCleanup cleanup . p)
(addCleanup cleanup . c)
addCleanup cleanup (Leftover p i) = Leftover (addCleanup cleanup p) i
idP :: Monad m => Pipe l a a r m r
idP = NeedInput (HaveOutput idP (return ())) Done
pipe :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
pipe =
goRight (return ())
where
goRight final left right =
case right of
HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o
NeedInput rp rc -> goLeft rp rc final left
Done r2 -> PipeM (final >> return (Done r2))
PipeM mp -> PipeM (liftM recurse mp)
Leftover _ i -> absurd i
where
recurse = goRight final left
goLeft rp rc final left =
case left of
HaveOutput left' final' o -> goRight final' left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (return ()) (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc final
pipeL :: Monad m => Pipe l a b r0 m r1 -> Pipe b b c r1 m r2 -> Pipe l a c r0 m r2
pipeL =
goRight (return ())
where
goRight final left right =
case right of
HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o
NeedInput rp rc -> goLeft rp rc final left
Done r2 -> PipeM (final >> return (Done r2))
PipeM mp -> PipeM (liftM recurse mp)
Leftover right' i -> goRight final (HaveOutput left final i) right'
where
recurse = goRight final left
goLeft rp rc final left =
case left of
HaveOutput left' final' o -> goRight final' left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (return ()) (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc final
connectResume :: Monad m
=> ResumableSource m o
-> Sink o m r
-> m (ResumableSource m o, r)
connectResume (ResumableSource (ConduitM left0) leftFinal0) (ConduitM right0) =
goRight leftFinal0 left0 right0
where
goRight leftFinal left right =
case right of
HaveOutput _ _ o -> absurd o
NeedInput rp rc -> goLeft rp rc leftFinal left
Done r2 -> return (ResumableSource (ConduitM left) leftFinal, r2)
PipeM mp -> mp >>= goRight leftFinal left
Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p
goLeft rp rc leftFinal left =
case left of
HaveOutput left' leftFinal' o -> goRight leftFinal' left' (rp o)
NeedInput _ lc -> recurse (lc ())
Done () -> goRight (return ()) (Done ()) (rc ())
PipeM mp -> mp >>= recurse
Leftover p () -> recurse p
where
recurse = goLeft rp rc leftFinal
runPipe :: Monad m => Pipe Void () Void () m r -> m r
runPipe (HaveOutput _ _ o) = absurd o
runPipe (NeedInput _ c) = runPipe (c ())
runPipe (Done r) = return r
runPipe (PipeM mp) = mp >>= runPipe
runPipe (Leftover _ i) = absurd i
injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
injectLeftovers =
go []
where
go ls (HaveOutput p c o) = HaveOutput (go ls p) c o
go (l:ls) (NeedInput p _) = go ls $ p l
go [] (NeedInput p c) = NeedInput (go [] . p) (go [] . c)
go _ (Done r) = Done r
go ls (PipeM mp) = PipeM (liftM (go ls) mp)
go ls (Leftover p l) = go (l:ls) p
transPipe :: Monad m => (forall a. m a -> n a) -> Pipe l i o u m r -> Pipe l i o u n r
transPipe f (HaveOutput p c o) = HaveOutput (transPipe f p) (f c) o
transPipe f (NeedInput p c) = NeedInput (transPipe f . p) (transPipe f . c)
transPipe _ (Done r) = Done r
transPipe f (PipeM mp) =
PipeM (f $ liftM (transPipe f) $ collapse mp)
where
collapse mpipe = do
pipe' <- mpipe
case pipe' of
PipeM mpipe' -> collapse mpipe'
_ -> return pipe'
transPipe f (Leftover p i) = Leftover (transPipe f p) i
mapOutput :: Monad m => (o1 -> o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
mapOutput f =
go
where
go (HaveOutput p c o) = HaveOutput (go p) c (f o)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
mapOutputMaybe f =
go
where
go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (mapOutputMaybe f p)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
mapInput :: Monad m
=> (i1 -> i2)
-> (l2 -> Maybe l1)
-> Pipe l2 i2 o u m r
-> Pipe l1 i1 o u m r
mapInput f f' (HaveOutput p c o) = HaveOutput (mapInput f f' p) c o
mapInput f f' (NeedInput p c) = NeedInput (mapInput f f' . p . f) (mapInput f f' . c)
mapInput _ _ (Done r) = Done r
mapInput f f' (PipeM mp) = PipeM (liftM (mapInput f f') mp)
mapInput f f' (Leftover p i) = maybe id (flip Leftover) (f' i) $ mapInput f f' p
enumFromTo :: (Enum o, Eq o, Monad m)
=> o
-> o
-> Pipe l i o u m ()
enumFromTo start stop =
loop start
where
loop i
| i == stop = HaveOutput (Done ()) (return ()) i
| otherwise = HaveOutput (loop (succ i)) (return ()) i
sourceList :: Monad m => [a] -> Pipe l i a u m ()
sourceList =
go
where
go [] = Done ()
go (o:os) = HaveOutput (go os) (return ()) o
build :: Monad m => (forall b. (o -> b -> b) -> b -> b) -> Pipe l i o u m ()
build g = g (\o p -> HaveOutput p (return ()) o) (return ())
sourceToPipe :: Monad m => Source m o -> Pipe l i o u m ()
sourceToPipe =
go . unConduitM
where
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput _ c) = go $ c ()
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p ()) = go p
sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r
sinkToPipe =
go . injectLeftovers . unConduitM
where
go (HaveOutput _ _ o) = absurd o
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover _ l) = absurd l
conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m ()
conduitToPipe =
go . injectLeftovers . unConduitM
where
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover _ l) = absurd l
withUpstream :: Monad m
=> Pipe l i o u m r
-> Pipe l i o u m (u, r)
withUpstream down =
down >>= go
where
go r =
loop
where
loop = awaitE >>= either (\u -> return (u, r)) (\_ -> loop)
unwrapResumable :: MonadIO m => ResumableSource m o -> m (Source m o, m ())
unwrapResumable (ResumableSource src final) = do
ref <- liftIO $ I.newIORef True
let final' = do
x <- liftIO $ I.readIORef ref
when x final
return (liftIO (I.writeIORef ref False) >> src, final')
newResumableSource :: Monad m => Source m o -> ResumableSource m o
newResumableSource s = ResumableSource s (return ())
infixr 9 <+<
infixl 9 >+>
(>+>) :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
(>+>) = pipe
(<+<) :: Monad m => Pipe Void b c r1 m r2 -> Pipe l a b r0 m r1 -> Pipe l a c r0 m r2
(<+<) = flip pipe
toProducer :: Monad m => Source m a -> Producer m a
toProducer =
ConduitM . go . unConduitM
where
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput _ c) = go (c ())
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p ()) = go p
toConsumer :: Monad m => Sink a m b -> Consumer a m b
toConsumer =
ConduitM . go . unConduitM
where
go (HaveOutput _ _ o) = absurd o
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p l) = Leftover (go p) l
instance MFunctor (Pipe l i o u) where
hoist = transPipe
catchP :: (MonadBaseControl IO m, Exception e)
=> Pipe l i o u m r
-> (e -> Pipe l i o u m r)
-> Pipe l i o u m r
catchP p0 onErr =
go p0
where
go (Done r) = Done r
go (PipeM mp) = PipeM $ E.catch (liftM go mp) (return . onErr)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p c o) = HaveOutput (go p) c o
handleP :: (MonadBaseControl IO m, Exception e)
=> (e -> Pipe l i o u m r)
-> Pipe l i o u m r
-> Pipe l i o u m r
handleP = flip catchP
tryP :: (MonadBaseControl IO m, Exception e)
=> Pipe l i o u m r
-> Pipe l i o u m (Either e r)
tryP =
go
where
go (Done r) = Done (Right r)
go (PipeM mp) = PipeM $ E.catch (liftM go mp) (return . Done . Left)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p c o) = HaveOutput (go p) c o
catchC :: (MonadBaseControl IO m, Exception e)
=> ConduitM i o m r
-> (e -> ConduitM i o m r)
-> ConduitM i o m r
catchC (ConduitM p) f = ConduitM (catchP p (unConduitM . f))
handleC :: (MonadBaseControl IO m, Exception e)
=> (e -> ConduitM i o m r)
-> ConduitM i o m r
-> ConduitM i o m r
handleC = flip catchC
tryC :: (MonadBaseControl IO m, Exception e)
=> ConduitM i o m r
-> ConduitM i o m (Either e r)
tryC = ConduitM . tryP . unConduitM
zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks (ConduitM x0) (ConduitM y0) =
ConduitM $ injectLeftovers x0 >< injectLeftovers y0
where
(><) :: Monad m => Pipe Void i Void () m r1 -> Pipe Void i Void () m r2 -> Pipe l i o () m (r1, r2)
Leftover _ i >< _ = absurd i
_ >< Leftover _ i = absurd i
HaveOutput _ _ o >< _ = absurd o
_ >< HaveOutput _ _ o = absurd o
PipeM mx >< y = PipeM (liftM (>< y) mx)
x >< PipeM my = PipeM (liftM (x ><) my)
Done x >< Done y = Done (x, y)
NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ())
NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (\u -> cx u >< y)
x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (\u -> x >< cy u)
zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b)
zipSources (ConduitM left0) (ConduitM right0) =
ConduitM $ go left0 right0
where
go (Leftover left ()) right = go left right
go left (Leftover right ()) = go left right
go (Done ()) (Done ()) = Done ()
go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (Done ()))
go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (Done ()))
go (Done ()) (PipeM _) = Done ()
go (PipeM _) (Done ()) = Done ()
go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x, y)
go (NeedInput _ c) right = go (c ()) right
go left (NeedInput _ c) = go left (c ())
zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b
zipSourcesApp (ConduitM left0) (ConduitM right0) =
ConduitM $ go left0 right0
where
go (Leftover left ()) right = go left right
go left (Leftover right ()) = go left right
go (Done ()) (Done ()) = Done ()
go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (Done ()))
go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (Done ()))
go (Done ()) (PipeM _) = Done ()
go (PipeM _) (Done ()) = Done ()
go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x y)
go (NeedInput _ c) right = go (c ()) right
go left (NeedInput _ c) = go left (c ())
zipConduitApp
:: Monad m
=> ConduitM i o m (x -> y)
-> ConduitM i o m x
-> ConduitM i o m y
zipConduitApp (ConduitM left0) (ConduitM right0) =
ConduitM $ go (return ()) (return ()) (injectLeftovers left0) (injectLeftovers right0)
where
go _ _ (Done f) (Done x) = Done (f x)
go _ finalY (HaveOutput x finalX o) y = HaveOutput
(go finalX finalY x y)
(finalX >> finalY)
o
go finalX _ x (HaveOutput y finalY o) = HaveOutput
(go finalX finalY x y)
(finalX >> finalY)
o
go _ _ (Leftover _ i) _ = absurd i
go _ _ _ (Leftover _ i) = absurd i
go finalX finalY (PipeM mx) y = PipeM (flip (go finalX finalY) y `liftM` mx)
go finalX finalY x (PipeM my) = PipeM (go finalX finalY x `liftM` my)
go finalX finalY (NeedInput px cx) (NeedInput py cy) = NeedInput
(\i -> go finalX finalY (px i) (py i))
(\u -> go finalX finalY (cx u) (cy u))
go finalX finalY (NeedInput px cx) (Done y) = NeedInput
(\i -> go finalX finalY (px i) (Done y))
(\u -> go finalX finalY (cx u) (Done y))
go finalX finalY (Done x) (NeedInput py cy) = NeedInput
(\i -> go finalX finalY (Done x) (py i))
(\u -> go finalX finalY (Done x) (cy u))
fuseReturnLeftovers :: Monad m
=> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m (r, [b])
fuseReturnLeftovers (ConduitM left0) (ConduitM right0) =
ConduitM $ goRight (return ()) [] left0 right0
where
goRight final bs left right =
case right of
HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o
NeedInput rp rc ->
case bs of
[] -> goLeft rp rc final left
b:bs' -> goRight final bs' left (rp b)
Done r2 -> PipeM (final >> return (Done (r2, bs)))
PipeM mp -> PipeM (liftM recurse mp)
Leftover p b -> goRight final (b:bs) left p
where
recurse = goRight final bs left
goLeft rp rc final left =
case left of
HaveOutput left' final' o -> goRight final' [] left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (return ()) [] (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc final
fuseLeftovers
:: Monad m
=> ([b] -> [a])
-> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m r
fuseLeftovers f left right = do
(r, bs) <- fuseReturnLeftovers left right
ConduitM $ mapM_ leftover $ reverse $ f bs
return r
data ResumableConduit i m o =
ResumableConduit (Conduit i m o) (m ())
connectResumeConduit
:: Monad m
=> ResumableConduit i m o
-> Sink o m r
-> Sink i m (ResumableConduit i m o, r)
connectResumeConduit (ResumableConduit (ConduitM left0) leftFinal0) (ConduitM right0) =
ConduitM $ goRight leftFinal0 left0 right0
where
goRight leftFinal left right =
case right of
HaveOutput _ _ o -> absurd o
NeedInput rp rc -> goLeft rp rc leftFinal left
Done r2 -> Done (ResumableConduit (ConduitM left) leftFinal, r2)
PipeM mp -> PipeM (liftM (goRight leftFinal left) mp)
Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p
goLeft rp rc leftFinal left =
case left of
HaveOutput left' leftFinal' o -> goRight leftFinal' left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done () -> goRight (return ()) (Done ()) (rc ())
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc leftFinal
unwrapResumableConduit :: MonadIO m => ResumableConduit i m o -> m (Conduit i m o, m ())
unwrapResumableConduit (ResumableConduit src final) = do
ref <- liftIO $ I.newIORef True
let final' = do
x <- liftIO $ I.readIORef ref
when x final
return (liftIO (I.writeIORef ref False) >> src, final')
newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o
newResumableConduit c = ResumableConduit c (return ())
passthroughSink :: Monad m
=> Sink i m r
-> (r -> m ())
-> Conduit i m i
passthroughSink (ConduitM sink0) final =
ConduitM $ go [] sink0
where
go _ (Done r) = do
lift $ final r
awaitForever yield
go is (Leftover sink i) = go (i:is) sink
go _ (HaveOutput _ _ o) = absurd o
go is (PipeM mx) = do
x <- lift mx
go is x
go (i:is) (NeedInput next _) = go is (next i)
go [] (NeedInput next done) = do
mx <- await
case mx of
Nothing -> go [] (done ())
Just x -> do
yield x
go [] (next x)
generalizeUpstream :: Monad m => Pipe l i o () m r -> Pipe l i o u m r
generalizeUpstream =
go
where
go (HaveOutput p f o) = HaveOutput (go p) f o
go (NeedInput x y) = NeedInput (go . x) (\_ -> go (y ()))
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p l) = Leftover (go p) l