module Data.Conduit.Internal.Conduit
(
ConduitM (..)
, Source
, Producer
, Sink
, Consumer
, Conduit
, ResumableSource (..)
, ResumableConduit (..)
, Flush (..)
, ZipSource (..)
, ZipSink (..)
, ZipConduit (..)
, await
, awaitForever
, yield
, yieldM
, yieldOr
, leftover
, runConduit
, connectResume
, connectResumeConduit
, fuseLeftovers
, fuseReturnLeftovers
, ($$+)
, ($$++)
, ($$+-)
, ($=+)
, (=$$+)
, (=$$++)
, (=$$+-)
, ($$)
, ($=)
, (=$)
, (=$=)
, sourceToPipe
, sinkToPipe
, conduitToPipe
, toProducer
, toConsumer
, bracketP
, addCleanup
, catchC
, handleC
, tryC
, Data.Conduit.Internal.Conduit.transPipe
, Data.Conduit.Internal.Conduit.mapOutput
, Data.Conduit.Internal.Conduit.mapOutputMaybe
, Data.Conduit.Internal.Conduit.mapInput
, Data.Conduit.Internal.Conduit.closeResumableSource
, unwrapResumable
, unwrapResumableConduit
, newResumableSource
, newResumableConduit
, zipSinks
, zipSources
, zipSourcesApp
, zipConduitApp
, mergeSource
, passthroughSink
, sourceToList
, fuseBoth
, fuseBothMaybe
, fuseUpstream
, sequenceSources
, sequenceSinks
, sequenceConduits
) where
import Prelude hiding (catch)
import Control.Applicative (Applicative (..))
import Control.Exception.Lifted as E (Exception)
import qualified Control.Exception.Lifted as E (catch)
import Control.Monad (liftM, when, liftM2, ap)
import Control.Monad.Error.Class(MonadError(..))
import Control.Monad.Reader.Class(MonadReader(..))
import Control.Monad.RWS.Class(MonadRWS())
import Control.Monad.Writer.Class(MonadWriter(..))
import Control.Monad.State.Class(MonadState(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Base (MonadBase (liftBase))
import Data.Void (Void, absurd)
import Data.Monoid (Monoid (mappend, mempty))
import Control.Monad.Trans.Resource
import qualified Data.IORef as I
import Control.Monad.Morph (MFunctor (..))
import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, yieldOr, await, awaitForever, addCleanup, bracketP)
import qualified Data.Conduit.Internal.Pipe as CI
import Control.Monad (forever)
import Data.Traversable (Traversable (..))
import Control.Monad.Catch (MonadCatch, catch)
newtype ConduitM i o m r = ConduitM
{ unConduitM :: forall b.
(r -> Pipe i i o () m b) -> Pipe i i o () m b
}
instance Functor (ConduitM i o m) where
fmap f (ConduitM c) = ConduitM $ \rest -> c (rest . f)
instance Applicative (ConduitM i o m) where
pure x = ConduitM ($ x)
(<*>) = ap
instance Monad (ConduitM i o m) where
return = pure
ConduitM f >>= g = ConduitM $ \h -> f $ \a -> unConduitM (g a) h
instance MonadThrow m => MonadThrow (ConduitM i o m) where
throwM = lift . throwM
instance MFunctor (ConduitM i o) where
hoist f (ConduitM c0) = ConduitM $ \rest -> let
go (HaveOutput p c o) = HaveOutput (go p) (f c) o
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) =
PipeM (f $ liftM go $ collapse mp)
where
collapse mpipe = do
pipe' <- mpipe
case pipe' of
PipeM mpipe' -> collapse mpipe'
_ -> return pipe'
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
instance MonadCatch m => MonadCatch (ConduitM i o m) where
catch (ConduitM p0) onErr = ConduitM $ \rest -> let
go (Done r) = rest r
go (PipeM mp) = PipeM $ catch (liftM go mp) (return . flip unConduitM rest . onErr)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p c o) = HaveOutput (go p) c o
in go (p0 Done)
instance MonadIO m => MonadIO (ConduitM i o m) where
liftIO = lift . liftIO
instance MonadReader r m => MonadReader r (ConduitM i o m) where
ask = lift ask
local f (ConduitM c0) = ConduitM $ \rest ->
let go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
go (Done x) = rest x
go (PipeM mp) = PipeM (liftM go $ local f mp)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
#ifndef MIN_VERSION_mtl
#define MIN_VERSION_mtl(x, y, z) 0
#endif
instance MonadWriter w m => MonadWriter w (ConduitM i o m) where
#if MIN_VERSION_mtl(2, 1, 0)
writer = lift . writer
#endif
tell = lift . tell
listen (ConduitM c0) = ConduitM $ \rest ->
let go front (HaveOutput p c o) = HaveOutput (go front p) c o
go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u))
go front (Done x) = rest (x, front)
go front (PipeM mp) = PipeM $ do
(p,w) <- listen mp
return $ go (front `mappend` w) p
go front (Leftover p i) = Leftover (go front p) i
in go mempty (c0 Done)
pass (ConduitM c0) = ConduitM $ \rest ->
let go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
go (PipeM mp) = PipeM $ mp >>= (return . go)
go (Done (x,_)) = rest x
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
instance MonadState s m => MonadState s (ConduitM i o m) where
get = lift get
put = lift . put
#if MIN_VERSION_mtl(2, 1, 0)
state = lift . state
#endif
instance MonadRWS r w s m => MonadRWS r w s (ConduitM i o m)
instance MonadError e m => MonadError e (ConduitM i o m) where
throwError = lift . throwError
catchError (ConduitM c0) f = ConduitM $ \rest ->
let go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
go (Done x) = rest x
go (PipeM mp) =
PipeM $ catchError (liftM go mp) $ \e -> do
return $ unConduitM (f e) rest
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
instance MonadBase base m => MonadBase base (ConduitM i o m) where
liftBase = lift . liftBase
instance MonadTrans (ConduitM i o) where
lift mr = ConduitM $ \rest -> PipeM (liftM rest mr)
instance MonadResource m => MonadResource (ConduitM i o m) where
liftResourceT = lift . liftResourceT
instance Monad m => Monoid (ConduitM i o m ()) where
mempty = return ()
mappend = (>>)
type Source m o = ConduitM () o m ()
type Producer m o = forall i. ConduitM i o m ()
type Sink i = ConduitM i Void
type Consumer i m r = forall o. ConduitM i o m r
type Conduit i m o = ConduitM i o m ()
data ResumableSource m o = ResumableSource (Pipe () () o () m ()) (m ())
instance MFunctor ResumableSource where
hoist nat (ResumableSource src m) = ResumableSource (hoist nat src) (nat m)
connectResume :: Monad m
=> ResumableSource m o
-> Sink o m r
-> m (ResumableSource m o, r)
connectResume (ResumableSource left0 leftFinal0) (ConduitM right0) =
goRight leftFinal0 left0 (right0 Done)
where
goRight leftFinal left right =
case right of
HaveOutput _ _ o -> absurd o
NeedInput rp rc -> goLeft rp rc leftFinal left
Done r2 -> return (ResumableSource left leftFinal, r2)
PipeM mp -> mp >>= goRight leftFinal left
Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p
goLeft rp rc leftFinal left =
case left of
HaveOutput left' leftFinal' o -> goRight leftFinal' left' (rp o)
NeedInput _ lc -> recurse (lc ())
Done () -> goRight (return ()) (Done ()) (rc ())
PipeM mp -> mp >>= recurse
Leftover p () -> recurse p
where
recurse = goLeft rp rc leftFinal
sourceToPipe :: Monad m => Source m o -> Pipe l i o u m ()
sourceToPipe =
go . flip unConduitM Done
where
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput _ c) = go $ c ()
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p ()) = go p
sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r
sinkToPipe =
go . injectLeftovers . flip unConduitM Done
where
go (HaveOutput _ _ o) = absurd o
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover _ l) = absurd l
conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m ()
conduitToPipe =
go . injectLeftovers . flip unConduitM Done
where
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover _ l) = absurd l
unwrapResumable :: MonadIO m => ResumableSource m o -> m (Source m o, m ())
unwrapResumable (ResumableSource src final) = do
ref <- liftIO $ I.newIORef True
let final' = do
x <- liftIO $ I.readIORef ref
when x final
return (liftIO (I.writeIORef ref False) >> (ConduitM (src >>=)), final')
newResumableSource :: Monad m => Source m o -> ResumableSource m o
newResumableSource (ConduitM s) = ResumableSource (s Done) (return ())
toProducer :: Monad m => Source m a -> Producer m a
toProducer (ConduitM c0) = ConduitM $ \rest -> let
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput _ c) = go (c ())
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p ()) = go p
in go (c0 Done)
toConsumer :: Monad m => Sink a m b -> Consumer a m b
toConsumer (ConduitM c0) = ConduitM $ \rest -> let
go (HaveOutput _ _ o) = absurd o
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p l) = Leftover (go p) l
in go (c0 Done)
catchC :: (MonadBaseControl IO m, Exception e)
=> ConduitM i o m r
-> (e -> ConduitM i o m r)
-> ConduitM i o m r
catchC (ConduitM p0) onErr = ConduitM $ \rest -> let
go (Done r) = rest r
go (PipeM mp) = PipeM $ E.catch (liftM go mp)
(return . flip unConduitM rest . onErr)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p c o) = HaveOutput (go p) c o
in go (p0 Done)
handleC :: (MonadBaseControl IO m, Exception e)
=> (e -> ConduitM i o m r)
-> ConduitM i o m r
-> ConduitM i o m r
handleC = flip catchC
tryC :: (MonadBaseControl IO m, Exception e)
=> ConduitM i o m r
-> ConduitM i o m (Either e r)
tryC (ConduitM c0) = ConduitM $ \rest -> let
go (Done r) = rest (Right r)
go (PipeM mp) = PipeM $ E.catch (liftM go mp) (return . rest . Left)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p c o) = HaveOutput (go p) c o
in go (c0 Done)
zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks (ConduitM x0) (ConduitM y0) = ConduitM $ \rest -> let
Leftover _ i >< _ = absurd i
_ >< Leftover _ i = absurd i
HaveOutput _ _ o >< _ = absurd o
_ >< HaveOutput _ _ o = absurd o
PipeM mx >< y = PipeM (liftM (>< y) mx)
x >< PipeM my = PipeM (liftM (x ><) my)
Done x >< Done y = rest (x, y)
NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ())
NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (\u -> cx u >< y)
x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (\u -> x >< cy u)
in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done)
zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b)
zipSources (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let
go (Leftover left ()) right = go left right
go left (Leftover right ()) = go left right
go (Done ()) (Done ()) = rest ()
go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (rest ()))
go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (rest ()))
go (Done ()) (PipeM _) = rest ()
go (PipeM _) (Done ()) = rest ()
go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x, y)
go (NeedInput _ c) right = go (c ()) right
go left (NeedInput _ c) = go left (c ())
in go (left0 Done) (right0 Done)
zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b
zipSourcesApp (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let
go (Leftover left ()) right = go left right
go left (Leftover right ()) = go left right
go (Done ()) (Done ()) = rest ()
go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (rest ()))
go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (rest ()))
go (Done ()) (PipeM _) = rest ()
go (PipeM _) (Done ()) = rest ()
go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x y)
go (NeedInput _ c) right = go (c ()) right
go left (NeedInput _ c) = go left (c ())
in go (left0 Done) (right0 Done)
zipConduitApp
:: Monad m
=> ConduitM i o m (x -> y)
-> ConduitM i o m x
-> ConduitM i o m y
zipConduitApp (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let
go _ _ (Done f) (Done x) = rest (f x)
go finalX finalY (PipeM mx) y = PipeM (flip (go finalX finalY) y `liftM` mx)
go finalX finalY x (PipeM my) = PipeM (go finalX finalY x `liftM` my)
go _ finalY (HaveOutput x finalX o) y = HaveOutput
(go finalX finalY x y)
(finalX >> finalY)
o
go finalX _ x (HaveOutput y finalY o) = HaveOutput
(go finalX finalY x y)
(finalX >> finalY)
o
go _ _ (Leftover _ i) _ = absurd i
go _ _ _ (Leftover _ i) = absurd i
go finalX finalY (NeedInput px cx) (NeedInput py cy) = NeedInput
(\i -> go finalX finalY (px i) (py i))
(\u -> go finalX finalY (cx u) (cy u))
go finalX finalY (NeedInput px cx) (Done y) = NeedInput
(\i -> go finalX finalY (px i) (Done y))
(\u -> go finalX finalY (cx u) (Done y))
go finalX finalY (Done x) (NeedInput py cy) = NeedInput
(\i -> go finalX finalY (Done x) (py i))
(\u -> go finalX finalY (Done x) (cy u))
in go (return ()) (return ()) (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done)
fuseReturnLeftovers :: Monad m
=> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m (r, [b])
fuseReturnLeftovers (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let
goRight final bs left right =
case right of
HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o
NeedInput rp rc ->
case bs of
[] -> goLeft rp rc final left
b:bs' -> goRight final bs' left (rp b)
Done r2 -> PipeM (final >> return (rest (r2, bs)))
PipeM mp -> PipeM (liftM recurse mp)
Leftover p b -> goRight final (b:bs) left p
where
recurse = goRight final bs left
goLeft rp rc final left =
case left of
HaveOutput left' final' o -> goRight final' [] left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (return ()) [] (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc final
in goRight (return ()) [] (left0 Done) (right0 Done)
fuseLeftovers
:: Monad m
=> ([b] -> [a])
-> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m r
fuseLeftovers f left right = do
(r, bs) <- fuseReturnLeftovers left right
mapM_ leftover $ reverse $ f bs
return r
data ResumableConduit i m o =
ResumableConduit (Pipe i i o () m ()) (m ())
connectResumeConduit
:: Monad m
=> ResumableConduit i m o
-> Sink o m r
-> Sink i m (ResumableConduit i m o, r)
connectResumeConduit (ResumableConduit left0 leftFinal0) (ConduitM right0) = ConduitM $ \rest -> let
goRight leftFinal left right =
case right of
HaveOutput _ _ o -> absurd o
NeedInput rp rc -> goLeft rp rc leftFinal left
Done r2 -> rest (ResumableConduit left leftFinal, r2)
PipeM mp -> PipeM (liftM (goRight leftFinal left) mp)
Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p
goLeft rp rc leftFinal left =
case left of
HaveOutput left' leftFinal' o -> goRight leftFinal' left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done () -> goRight (return ()) (Done ()) (rc ())
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc leftFinal
in goRight leftFinal0 left0 (right0 Done)
unwrapResumableConduit :: MonadIO m => ResumableConduit i m o -> m (Conduit i m o, m ())
unwrapResumableConduit (ResumableConduit src final) = do
ref <- liftIO $ I.newIORef True
let final' = do
x <- liftIO $ I.readIORef ref
when x final
return (ConduitM ((liftIO (I.writeIORef ref False) >> src) >>=), final')
newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o
newResumableConduit (ConduitM c) = ResumableConduit (c Done) (return ())
mergeSource
:: Monad m
=> Source m i
-> Conduit a m (i, a)
mergeSource = loop . newResumableSource
where
loop :: Monad m => ResumableSource m i -> Conduit a m (i, a)
loop src0 = await >>= maybe (lift $ closeResumableSource src0) go
where
go a = do
(src1, mi) <- lift $ src0 $$++ await
case mi of
Nothing -> lift $ closeResumableSource src1
Just i -> yield (i, a) >> loop src1
passthroughSink :: Monad m
=> Sink i m r
-> (r -> m ())
-> Conduit i m i
passthroughSink (ConduitM sink0) final = ConduitM $ \rest -> let
go _ (Done r) = do
lift $ final r
unConduitM (awaitForever yield) rest
go is (Leftover sink i) = go (i:is) sink
go _ (HaveOutput _ _ o) = absurd o
go is (PipeM mx) = do
x <- lift mx
go is x
go (i:is) (NeedInput next _) = go is (next i)
go [] (NeedInput next done) = do
mx <- CI.await
case mx of
Nothing -> go [] (done ())
Just x -> do
CI.yield x
go [] (next x)
in go [] (sink0 Done)
sourceToList :: Monad m => Source m a -> m [a]
sourceToList =
go . flip unConduitM Done
where
go (Done _) = return []
go (HaveOutput src _ x) = liftM (x:) (go src)
go (PipeM msrc) = msrc >>= go
go (NeedInput _ c) = go (c ())
go (Leftover p _) = go p
infixr 0 $$
infixl 1 $=
infixr 2 =$
infixr 2 =$=
infixr 0 $$+
infixr 0 $$++
infixr 0 $$+-
infixl 1 $=+
($$) :: Monad m => Source m a -> Sink a m b -> m b
src $$ sink = do
(rsrc, res) <- src $$+ sink
rsrc $$+- return ()
return res
($=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
($=) = (=$=)
(=$) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
(=$) = (=$=)
(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
ConduitM left0 =$= ConduitM right0 = ConduitM $ \rest ->
let goRight final left right =
case right of
HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o
NeedInput rp rc -> goLeft rp rc final left
Done r2 -> PipeM (final >> return (rest r2))
PipeM mp -> PipeM (liftM recurse mp)
Leftover right' i -> goRight final (HaveOutput left final i) right'
where
recurse = goRight final left
goLeft rp rc final left =
case left of
HaveOutput left' final' o -> goRight final' left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (return ()) (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc final
in goRight (return ()) (left0 Done) (right0 Done)
where
await :: Monad m => Consumer i m (Maybe i)
await = ConduitM $ \f -> NeedInput (f . Just) (const $ f Nothing)
await' :: Monad m
=> ConduitM i o m r
-> (i -> ConduitM i o m r)
-> ConduitM i o m r
await' f g = ConduitM $ \rest -> NeedInput
(\i -> unConduitM (g i) rest)
(const $ unConduitM f rest)
yield :: Monad m
=> o
-> ConduitM i o m ()
yield o = yieldOr o (return ())
yieldM :: Monad m => m o -> ConduitM i o m ()
yieldM mo = lift mo >>= yield
leftover :: i -> ConduitM i o m ()
leftover i = ConduitM $ \rest -> Leftover (rest ()) i
runConduit :: Monad m => ConduitM () Void m r -> m r
runConduit (ConduitM p) = runPipe $ injectLeftovers $ p Done
bracketP :: MonadResource m
=> IO a
-> (a -> IO ())
-> (a -> ConduitM i o m r)
-> ConduitM i o m r
bracketP alloc free inside = ConduitM $ \rest -> PipeM $ do
(key, seed) <- allocate alloc free
return $ unConduitM (addCleanup (const $ release key) (inside seed)) rest
addCleanup :: Monad m
=> (Bool -> m ())
-> ConduitM i o m r
-> ConduitM i o m r
addCleanup cleanup (ConduitM c0) = ConduitM $ \rest -> let
go (Done r) = PipeM (cleanup True >> return (rest r))
go (HaveOutput src close x) = HaveOutput
(go src)
(cleanup False >> close)
x
go (PipeM msrc) = PipeM (liftM (go) msrc)
go (NeedInput p c) = NeedInput
(go . p)
(go . c)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
yieldOr :: Monad m
=> o
-> m ()
-> ConduitM i o m ()
yieldOr o m = ConduitM $ \rest -> HaveOutput (rest ()) m o
awaitForever :: Monad m => (i -> ConduitM i o m r) -> ConduitM i o m ()
awaitForever f = ConduitM $ \rest ->
let go = NeedInput (\i -> unConduitM (f i) (const go)) rest
in go
transPipe :: Monad m => (forall a. m a -> n a) -> ConduitM i o m r -> ConduitM i o n r
transPipe = hoist
mapOutput :: Monad m => (o1 -> o2) -> ConduitM i o1 m r -> ConduitM i o2 m r
mapOutput f (ConduitM c0) = ConduitM $ \rest -> let
go (HaveOutput p c o) = HaveOutput (go p) c (f o)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitM i o1 m r -> ConduitM i o2 m r
mapOutputMaybe f (ConduitM c0) = ConduitM $ \rest -> let
go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (go p)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
mapInput :: Monad m
=> (i1 -> i2)
-> (i2 -> Maybe i1)
-> ConduitM i2 o m r
-> ConduitM i1 o m r
mapInput f f' (ConduitM c0) = ConduitM $ \rest -> let
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput p c) = NeedInput (go . p . f) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM $ liftM go mp
go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p)
in go (c0 Done)
($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b)
ConduitM src $$+ sink =
connectResume (ResumableSource (src Done) (return ())) sink
($$++) :: Monad m => ResumableSource m a -> Sink a m b -> m (ResumableSource m a, b)
($$++) = connectResume
($$+-) :: Monad m => ResumableSource m a -> Sink a m b -> m b
rsrc $$+- sink = do
(ResumableSource _ final, res) <- connectResume rsrc sink
final
return res
($=+) :: Monad m => ResumableSource m a -> Conduit a m b -> ResumableSource m b
ResumableSource src final $=+ ConduitM sink =
ResumableSource (src `pipeL` sink Done) final
closeResumableSource :: Monad m => ResumableSource m a -> m ()
closeResumableSource = ($$+- return ())
data Flush a = Chunk a | Flush
deriving (Show, Eq, Ord)
instance Functor Flush where
fmap _ Flush = Flush
fmap f (Chunk a) = Chunk (f a)
newtype ZipSource m o = ZipSource { getZipSource :: Source m o }
instance Monad m => Functor (ZipSource m) where
fmap f = ZipSource . mapOutput f . getZipSource
instance Monad m => Applicative (ZipSource m) where
pure = ZipSource . forever . yield
(ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x
sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o)
sequenceSources = getZipSource . sequenceA . fmap ZipSource
newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }
instance Monad m => Functor (ZipSink i m) where
fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
pure = ZipSink . return
(ZipSink f) <*> (ZipSink x) =
ZipSink $ liftM (uncurry ($)) $ zipSinks f x
sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
sequenceSinks = getZipSink . sequenceA . fmap ZipSink
(=$$+) :: Monad m => Conduit a m b -> Sink b m r -> Sink a m (ResumableConduit a m b, r)
(=$$+) (ConduitM conduit) = connectResumeConduit (ResumableConduit (conduit Done) (return ()))
(=$$++) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m (ResumableConduit i m o, r)
(=$$++) = connectResumeConduit
(=$$+-) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m r
rsrc =$$+- sink = do
(ResumableConduit _ final, res) <- connectResumeConduit rsrc sink
lift final
return res
infixr 0 =$$+
infixr 0 =$$++
infixr 0 =$$+-
newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitM i o m r }
deriving Functor
instance Monad m => Applicative (ZipConduit i o m) where
pure = ZipConduit . pure
ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right)
sequenceConduits :: (Traversable f, Monad m) => f (ConduitM i o m r) -> ConduitM i o m (f r)
sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit
fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2)
fuseBoth (ConduitM up) (ConduitM down) =
ConduitM (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=)
fuseBothMaybe
:: Monad m
=> ConduitM a b m r1
-> ConduitM b c m r2
-> ConduitM a c m (Maybe r1, r2)
fuseBothMaybe (ConduitM up) (ConduitM down) =
ConduitM (pipeL (up Done) (go Nothing $ down Done) >>=)
where
go mup (Done r) = Done (mup, r)
go mup (PipeM mp) = PipeM $ liftM (go mup) mp
go mup (HaveOutput p c o) = HaveOutput (go mup p) c o
go _ (NeedInput p c) = NeedInput
(\i -> go Nothing (p i))
(\u -> go (Just u) (c ()))
go mup (Leftover p i) = Leftover (go mup p) i
fuseUpstream :: Monad m => ConduitM a b m r -> Conduit b m c -> ConduitM a c m r
fuseUpstream up down = fmap fst (fuseBoth up down)