module Web.Scotty.Comet
( connect
, kCometPlugin
, send
, Document
, Options(..)
, getReply
, eventQueue
, debugDocument
, debugReplyDocument
, defaultOptions
) where
import Web.Scotty (ScottyM, text, post, capture, param, setHeader, get, ActionM, jsonData)
import Data.Aeson hiding ((.=))
import Control.Monad
import Control.Concurrent.STM as STM
import Control.Concurrent.MVar as STM
import Control.Monad.IO.Class
import Paths_kansas_comet
import qualified Data.Map as Map
import Control.Concurrent
import Data.Default
import Data.Maybe ( fromJust )
import qualified Data.HashMap.Strict as HashMap
import System.Exit
import qualified Data.Text.Lazy as LT
import qualified Data.Text as T
import Data.Time.Calendar
import Data.Time.Clock
import Numeric
connect :: Options
-> (Document -> IO ())
-> ScottyM ()
connect opt callback = do
if not rtsSupportsBoundThreads
then liftIO $ do putStrLn "Application needs to be re-compiled with -threaded flag"
exitFailure
else return ()
when (verbose opt >= 1) $ liftIO $ putStrLn $ "kansas-comet connect with prefix=" ++ show (prefix opt)
uniqVar <- liftIO $ newMVar 0
let getUniq :: IO Int
getUniq = do
u <- takeMVar uniqVar
putMVar uniqVar (u + 1)
return u
tm :: UTCTime <- liftIO $ getCurrentTime
let server_id
= Numeric.showHex (toModifiedJulianDay (utctDay tm))
$ ("-" ++)
$ Numeric.showHex (floor (utctDayTime tm * 1000) :: Integer)
$ ""
contextDB <- liftIO $ atomically $ newTVar $ (Map.empty :: Map.Map Int Document)
let newContext :: IO Int
newContext = do
uq <- getUniq
picture <- atomically $ newEmptyTMVar
callbacks <- atomically $ newTVar $ Map.empty
queue <- atomically $ newTChan
let cxt = Document picture callbacks queue uq
liftIO $ atomically $ do
db <- readTVar contextDB
writeTVar contextDB $ Map.insert uq cxt db
_ <- forkIO $ callback cxt
return uq
post (capture $ prefix opt ++ "/") $ do
uq <- liftIO $ newContext
text (LT.pack $ "$.kc.session(" ++ show server_id ++ "," ++ show uq ++ ");")
get (capture $ prefix opt ++ "/act/" ++ server_id ++ "/:id/:act") $ do
setHeader "Cache-Control" "max-age=0, no-cache, private, no-store, must-revalidate"
num <- param "id"
when (verbose opt >= 2) $ liftIO $ putStrLn $
"Kansas Comet: get .../act/" ++ show num
let tryPushAction :: TMVar T.Text -> Int -> ActionM ()
tryPushAction var n = do
ping <- liftIO $ registerDelay (3 * 1000 * 1000)
res <- liftIO $ atomically $ do
b <- readTVar ping
if b then return Nothing else do
liftM Just (takeTMVar var)
when (verbose opt >= 2) $ liftIO $ putStrLn $
"Kansas Comet (sending to " ++ show n ++ "):\n" ++ show res
case res of
Just js -> do
text $ LT.fromChunks [js]
Nothing ->
text LT.empty
db <- liftIO $ atomically $ readTVar contextDB
case Map.lookup num db of
Nothing -> text (LT.pack $ "console.warn('Can not find act #" ++ show num ++ "');")
Just doc -> tryPushAction (sending doc) num
post (capture $ prefix opt ++ "/reply/" ++ server_id ++ "/:id/:uq") $ do
setHeader "Cache-Control" "max-age=0, no-cache, private, no-store, must-revalidate"
num <- param "id"
uq :: Int <- param "uq"
when (verbose opt >= 2) $ liftIO $ putStrLn $
"Kansas Comet: post .../reply/" ++ show num ++ "/" ++ show uq
wrappedVal :: Value <- jsonData
let val = fromJust $ let (Object m) = wrappedVal
in HashMap.lookup (T.pack "data") m
db <- liftIO $ atomically $ readTVar contextDB
case Map.lookup num db of
Nothing -> do
text (LT.pack $ "console.warn('Ignore reply for session #" ++ show num ++ "');")
Just doc -> do
liftIO $ do
atomically $ do
m <- readTVar (replies doc)
writeTVar (replies doc) $ Map.insert uq val m
text $ LT.pack ""
post (capture $ prefix opt ++ "/event/" ++ server_id ++ "/:id") $ do
setHeader "Cache-Control" "max-age=0, no-cache, private, no-store, must-revalidate"
num <- param "id"
when (verbose opt >= 2) $ liftIO $ putStrLn $
"Kansas Comet: post .../event/" ++ show num
wrappedVal :: Value <- jsonData
let val = fromJust $ let (Object m) = wrappedVal
in HashMap.lookup (T.pack "data") m
db <- liftIO $ atomically $ readTVar contextDB
case Map.lookup num db of
Nothing -> do
text (LT.pack $ "console.warn('Ignore reply for session #" ++ show num ++ "');")
Just doc -> do
liftIO $ atomically $ do
writeTChan (eventQueue doc) val
text $ LT.pack ""
return ()
kCometPlugin :: IO String
kCometPlugin = do
dataDir <- getDataDir
return $ dataDir ++ "/static/js/kansas-comet.js"
send :: Document -> T.Text -> IO ()
send doc js = atomically $ putTMVar (sending doc) $! js
getReply :: Document -> Int -> IO Value
getReply doc num = do
atomically $ do
db <- readTVar (replies doc)
case Map.lookup num db of
Nothing -> retry
Just r -> do
writeTVar (replies doc) $ Map.delete num db
return r
data Document = Document
{ sending :: TMVar T.Text
, replies :: TVar (Map.Map Int Value)
, eventQueue :: TChan Value
, _secret :: Int
}
data Options = Options
{ prefix :: String
, verbose :: Int
}
instance Default Options where
def = Options
{ prefix = ""
, verbose = 0
}
defaultOptions :: Options
defaultOptions = def
debugDocument :: IO Document
debugDocument = do
picture <- atomically $ newEmptyTMVar
callbacks <- atomically $ newTVar $ Map.empty
_ <- forkIO $ forever $ do
res <- atomically $ takeTMVar $ picture
putStrLn $ "Sending: " ++ show res
q <- atomically $ newTChan
return $ Document picture callbacks q 0
debugReplyDocument :: Document -> Int -> Value -> IO ()
debugReplyDocument doc uq val = atomically $ do
m <- readTVar (replies doc)
writeTVar (replies doc) $ Map.insert uq val m