module Pipes.IOStream (
IStream,
OStream,
mkIStream,
mkOStream,
fromIStream,
fromOStream,
readOne,
readWith,
wrapIStream,
write,
writeProducer,
isEnded,
(>->+),
Producer,
Consumer,
Parser
) where
import Control.Monad
import Data.IORef
import Pipes
import Pipes.Internal
import Pipes.Parse
newtype IStream a = IS (IORef (Producer a IO ()))
newtype OStream a = OS (IORef (Consumer a IO ()))
mkIStream :: Producer a IO () -> IO (IStream a)
mkIStream p = liftM IS $ newIORef p
mkOStream :: Consumer a IO () -> IO (OStream a)
mkOStream c = liftM OS $ newIORef c
fromIStream :: IStream a -> IO (Producer a IO ())
fromIStream (IS rp) = readIORef rp
fromOStream :: OStream a -> IO (Consumer a IO ())
fromOStream (OS rc) = readIORef rc
wrapIStream :: IStream a -> (Producer a IO () -> Producer b IO ()) ->
IO (IStream b)
wrapIStream i f = fromIStream i >>= mkIStream . f
readOne :: IStream a -> IO (Maybe a)
readOne (IS rp) = do
p <- readIORef rp
next p >>= \case
Left () -> return Nothing
Right (a, p') -> writeIORef rp p' >> return (Just a)
readWith :: IStream a -> Parser a IO r -> IO r
readWith (IS rp) parser = do
p <- readIORef rp
(r, p') <- runStateT parser p
writeIORef rp p'
return r
write :: OStream a -> a -> IO ()
write o a = writeProducer o (yield a)
writeProducer :: OStream a -> Producer a IO () -> IO ()
writeProducer (OS rc) p = readIORef rc >>= (p >->+) >>= writeIORef rc
isEnded :: OStream a -> IO Bool
isEnded (OS rc) = readIORef rc >>= \case
Pure _ -> return True
_ -> return False
(>->+) :: Monad m => Producer a m x -> Consumer a m y -> m (Consumer a m y)
p0 >->+ c0 = go p0 c0
where
go p c = case c of
Request _ f -> go1 p
where
go1 = \case
Respond a g -> g () `go` f a
M p' -> go1 =<< p'
Pure _ -> return c
Request v _ -> closed v
M c' -> go p =<< c'
Pure _ -> return c
Respond v _ -> closed v