module Data.Conduit.Process
(
sourceCmdWithConsumer
, sourceProcessWithConsumer
, sourceCmdWithStreams
, sourceProcessWithStreams
, withCheckedProcessCleanup
, module Data.Streaming.Process
) where
import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose)
import Data.Conduit
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
isStdStream = (\(Just h) -> return $ sinkHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
isStdStream = (\(Just h) -> return (sinkHandle h, liftIO $ hClose h), Just CreatePipe)
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
osStdStream = (\(Just h) -> return $ sourceHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
osStdStream = (\(Just h) -> return (sourceHandle h, liftIO $ hClose h), Just CreatePipe)
sourceProcessWithConsumer :: MonadIO m
=> CreateProcess
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceProcessWithConsumer cp consumer = do
(ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
res <- runConduit $ source .| consumer
close
ec <- waitForStreamingProcess cph
return (ec, res)
sourceCmdWithConsumer :: MonadIO m
=> String
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)
sourceProcessWithStreams
:: MonadUnliftIO m
=> CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr =
withUnliftIO $ \u -> do
( (sinkStdin, closeStdin)
, (sourceStdout, closeStdout)
, (sourceStderr, closeStderr)
, sph) <- streamingProcess cp
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
<*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
<*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
`finally` (closeStdout >> closeStderr)
`onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
return (ec, resStdout, resStderr)
sourceCmdWithStreams
:: MonadUnliftIO m
=> String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)
withCheckedProcessCleanup
:: ( InputSource stdin
, OutputSink stderr
, OutputSink stdout
, MonadUnliftIO m
)
=> CreateProcess
-> (stdin -> stdout -> stderr -> m b)
-> m b
withCheckedProcessCleanup cp f = withRunInIO $ \run -> bracket
(streamingProcess cp)
(\(_, _, _, sph) -> closeStreamingProcessHandle sph)
$ \(x, y, z, sph) -> do
res <- run (f x y z) `onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
if ec == ExitSuccess
then return res
else throwIO $ ProcessExitedUnsuccessfully cp ec
terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess = liftIO . terminateProcess . streamingProcessHandleRaw