{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Data.Conduit.Async ( module Data.Conduit.Async.Composition
, gatherFrom
, drainTo
) where
import Control.Monad.IO.Class
import Control.Monad.Loops
import Control.Monad.Trans.Class
import Data.Conduit
import Data.Conduit.Async.Composition
import Data.Foldable
import UnliftIO
gatherFrom :: (MonadIO m, MonadUnliftIO m)
=> Int
-> (TBQueue o -> m ())
-> ConduitT () o m ()
gatherFrom size scatter = do
chan <- liftIO $ newTBQueueIO (fromIntegral size)
worker <- lift $ async (scatter chan)
gather worker chan
where
gather worker chan = do
(xs, mres) <- liftIO $ atomically $ do
xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
(xs,) <$> pollSTM worker
traverse_ yield xs
case mres of
Just (Left e) -> liftIO $ throwIO (e :: SomeException)
Just (Right r) -> return r
Nothing -> gather worker chan
drainTo :: (MonadIO m, MonadUnliftIO m)
=> Int
-> (TBQueue (Maybe i) -> m r)
-> ConduitT i Void m r
drainTo size gather = do
chan <- liftIO $ newTBQueueIO (fromIntegral size)
worker <- lift $ async (gather chan)
scatter worker chan
where
scatter worker chan = do
mval <- await
(mx, action) <- liftIO $ atomically $ do
mres <- pollSTM worker
case mres of
Just (Left e) ->
return (Nothing, liftIO $ throwIO (e :: SomeException))
Just (Right r) ->
return (Just r, return ())
Nothing -> do
writeTBQueue chan mval
return (Nothing, return ())
action
case mx of
Just x -> return x
Nothing -> scatter worker chan