module Network.XmlPush.HttpPull.Client.Common (
HttpPullClArgs(..),
talkC,
) where
import Control.Monad
import "monads-tf" Control.Monad.Trans
import Control.Monad.Base
import Control.Monad.Trans.Control
import Control.Concurrent hiding (yield)
import Control.Concurrent.STM
import Data.Maybe
import Data.HandleLike
import Data.Pipe
import Data.Pipe.TChan
import System.IO
import Text.XML.Pipe
import Network.TigHTTP.Client
import Network.TigHTTP.Types
import qualified Data.ByteString.Lazy as LBS
data HttpPullClArgs h = HttpPullClArgs {
hostName :: String,
portNumber :: Int,
basePath :: FilePath,
getPath :: XmlNode -> FilePath,
poll :: HandleMonad h XmlNode,
isPending :: XmlNode -> Bool,
duration :: Maybe Int,
getDuration :: XmlNode -> Maybe Int
}
talkC :: (HandleLike h, MonadBaseControl IO (HandleMonad h)) =>
h -> String -> Int -> FilePath -> (XmlNode -> FilePath) ->
HandleMonad h XmlNode -> (XmlNode -> Bool) ->
TVar (Maybe Int) -> (XmlNode -> Maybe Int) ->
HandleMonad h (TChan XmlNode, TChan XmlNode)
talkC h addr pn pth gp pl ip dr gdr = do
lock <- liftBase $ atomically newTChan
liftBase . atomically $ writeTChan lock ()
inc <- liftBase $ atomically newTChan
inc' <- liftBase $ atomically newTChan
otc <- liftBase $ atomically newTChan
otc' <- liftBase $ atomically newTChan
void . liftBaseDiscard forkIO . runPipe_ $ fromTChan otc
=$= conversation lock h addr pn pth gp dr gdr
=$= toTChan inc
void . liftBaseDiscard forkIO . runPipe_ $ fromTChan otc'
=$= conversation lock h addr pn pth gp dr gdr
=$= toTChan inc'
void . liftBaseDiscard forkIO . forever $ do
d <- liftBase . atomically $ do
md <- readTVar dr
case md of
Just d -> return d
_ -> retry
liftBase $ threadDelay d
p <- pl
liftBase $ polling p ip inc' inc otc'
return (inc, otc)
conversation :: (HandleLike h, MonadBase IO (HandleMonad h)) =>
TChan () -> h -> String -> Int ->
FilePath -> (XmlNode -> FilePath) ->
TVar (Maybe a) -> (XmlNode -> Maybe a) ->
Pipe XmlNode XmlNode (HandleMonad h) ()
conversation lock h addr pn pth gp dr gdr =
talk lock h addr pn pth gp =$= setDuration dr gdr
setDuration :: MonadBase IO m => TVar (Maybe a) -> (o -> Maybe a) -> Pipe o o m ()
setDuration dr gdr = (await >>=) . maybe (return ()) $ \n -> case gdr n of
Just d -> do
lift . liftBase . atomically $ writeTVar dr (Just d)
yield n >> setDuration dr gdr
_ -> yield n >> setDuration dr gdr
polling :: XmlNode -> (XmlNode -> Bool) ->
TChan XmlNode -> TChan XmlNode -> TChan XmlNode -> IO ()
polling pl ip i i' o = do
atomically $ writeTChan o pl
r <- atomically $ readTChan i
hFlush stdout
when (ip r) $ atomically (writeTChan i' r) >> polling pl ip i i' o
talk :: (MonadBase IO (HandleMonad h), HandleLike h) =>
TChan () -> h -> String -> Int ->
FilePath -> (XmlNode -> FilePath) ->
Pipe XmlNode XmlNode (HandleMonad h) ()
talk lock h addr pn pth gp = (await >>=) . maybe (return ()) $ \n -> do
let m = LBS.fromChunks [xmlString [n]]
lift . liftBase . atomically $ readTChan lock
r <- lift . request h $ post addr pn (pth ++ "/" ++ gp n) (Nothing, m)
void $ return ()
=$= responseBody r
=$= xmlEvent
=$= convert fromJust
=$= xmlNode []
lift . liftBase . atomically $ writeTChan lock ()
talk lock h addr pn pth gp