module ListT.Libcurl ( Session, Error, runSession, consumeURL, ) where import BasePrelude hiding (cons, uncons) import Foreign hiding (Pool, void, unsafePerformIO) import MTLPrelude hiding (Error) import ListT (ListT) import Data.ByteString (ByteString) import Control.Concurrent.STM.TMVar import qualified Data.ByteString as B import qualified Data.ByteString.Unsafe as BU import qualified Data.Pool as P import qualified Network.CURL720 as C import qualified ListT as L -- | -- A global sessions pool. -- -- Due to how the \"libcurl\" library integration is handled, -- there may only exist one per application, -- hence the API provides no way to establish another pool. {-# NOINLINE pool #-} pool :: P.Pool C.CURL pool = unsafePerformIO $ do C.loadlib C.CURL720 P.createPool acquire release 1 30 100 where acquire = do h <- C.curl_easy_init return h release h = do C.curl_easy_cleanup h -- | -- A monad for sequential execution of \"libcurl\" operations. -- -- To execute multiple requests concurrently you need to run multiple sessions. newtype Session a = Session (ReaderT C.CURL IO a) deriving (Functor, Applicative, Monad, MonadIO) type Error = C.CURLE runSession :: Session a -> IO (Either Error a) runSession (Session m) = try $ P.withResource pool $ runReaderT m consumeURL :: String -> (ListT IO ByteString -> IO a) -> Session a consumeURL url consumer = Session $ ReaderT $ \h -> do syncState@(active, chunk) <- atomically $ newSyncState C.curl_easy_setopt h [ C.CURLOPT_WRITEFUNCTION $ Just (syncWriteFunction syncState), C.CURLOPT_URL url, C.CURLOPT_FOLLOWLOCATION True ] result <- newEmptyMVar :: IO (MVar (Either SomeException a)) forkIO $ do r <- try $ consumer $ fix $ \loop -> join $ lift $ atomically $ tryTakeTMVar chunk >>= \case Just chunk -> return $ L.cons chunk loop _ -> readTVar active >>= \case False -> return mzero _ -> retry atomically $ writeTVar active False putMVar result r catch (C.curl_easy_perform h) $ \case C.CURLE _ _ _ C.CURLE_WRITE_ERROR -> return () e -> throwIO e atomically $ writeTVar active False either (throwIO :: SomeException -> IO a) return =<< takeMVar result type SyncState = (TVar Bool, TMVar ByteString) newSyncState :: STM SyncState newSyncState = (,) <$> newTVar True <*> newEmptyTMVar syncWriteFunction :: SyncState -> C.CURL_write_callback syncWriteFunction (active, chunk) b = atomically $ do readTVar active >>= \case False -> return C.CURL_WRITEFUNC_FAIL True -> putTMVar chunk b >> return C.CURL_WRITEFUNC_OK