module Network.JobQueue.JobQueue (
JobQueue
, FailureHandleFn
, AfterExecuteHandleFn
, Session
, Settings (..)
, openSession
, newSession
, closeSession
, openJobQueue
, closeJobQueue
, countJobQueue
, resumeJobQueue
, suspendJobQueue
, executeJob
, scheduleJob
, deleteJob
, clearJobs
, peekJob
) where
import Control.Applicative
import Control.Concurrent
import Control.Monad
import qualified Data.ByteString.Char8 as BS
import Data.Maybe
import Network.JobQueue.Class
import Network.JobQueue.Types
import Network.JobQueue.Action
import Network.JobQueue.Job
import Network.JobQueue.Backend
import Network.JobQueue.Backend.Class
import Network.JobQueue.Backend.Types
import Network.JobQueue.JobQueue.Internal
data Session = Session Bool String Backend
openSession :: String
-> IO (Session)
openSession locator = Session True locator <$> openBackend locator
newSession :: String
-> Backend -> Session
newSession dummyLocator backend = Session False dummyLocator backend
closeSession :: Session -> IO ()
closeSession (Session isOwner _locator backend) = when isOwner $ bClose backend
openJobQueue :: (Env e, Unit a)
=> Session
-> String
-> Settings a
-> JobM e a ()
-> IO (JobQueue e a)
openJobQueue (Session _isOwner _locator _backend@(Backend { bOpenQueue = oq })) name (Settings fhFn aeFn) jobm = do
JobQueue <$> oq name <*> buildActionState jobm <*> pure fhFn <*> pure aeFn
closeJobQueue :: (Env e, Unit a) => JobQueue e a -> IO ()
closeJobQueue JobQueue { jqBackendQueue = bq } = closeQueue bq
countJobQueue :: (Env e, Unit a) => JobQueue e a -> IO (Int)
countJobQueue JobQueue { jqBackendQueue = bq } = countQueue bq
resumeJobQueue :: (Env e, Unit a) => JobQueue e a -> IO (Bool)
resumeJobQueue jobqueue = do
r <- peekJob' jobqueue
case r of
Just (job, nodeName, idName, _version) -> case actionForJob job idName of
Execute StopTheWorld -> resume jobqueue nodeName
_ -> return True
_ -> return True
where
resume JobQueue { jqBackendQueue = bq } key = deleteQueue bq key
suspendJobQueue :: forall e. forall a. (Env e, Unit a) => JobQueue e a -> IO (Bool)
suspendJobQueue jobqueue = do
r <- peekJob' jobqueue
case r of
Just (job, _nodeName, idName, _version) -> case actionForJob job idName of
Execute StopTheWorld -> return False
_ -> suspend jobqueue >> return True
_ -> suspend jobqueue >> return True
where
suspend JobQueue { jqBackendQueue = bq } = writeQueue bq (pack (StopTheWorld :: Job a)) (1)
executeJob :: (Env e, Unit a) => JobQueue e a -> e -> IO ()
executeJob jobqueue env = do
r <- peekJob' jobqueue
case r of
Just (job, nodeName, idName, version) -> case actionForJob job idName of
Execute StopTheWorld -> do
threadDelay 1000000
return ()
Execute job' -> do
isUpdated <- updateJob jobqueue nodeName job' version
when (isUpdated && jobState job == Runnable && jobState job' == Running) $ do
executeJob' jobqueue env nodeName job' version >>= afterExecuteJob jobqueue nodeName job' version
(jqAfterExecuteFn jobqueue) job'
Delete -> do
_r <- deleteJob jobqueue nodeName
executeJob jobqueue env
Skip -> return ()
Nothing -> return ()
scheduleJob :: (Unit a)
=> JobQueue e a
-> a
-> IO ()
scheduleJob JobQueue { jqBackendQueue = bq } ju = do
job <- createJob Initialized ju
void $ writeQueue bq (pack job) (getPriority ju)
deleteJob :: (Unit a)
=> JobQueue e a
-> String
-> IO Bool
deleteJob JobQueue { jqBackendQueue = bq } nodeName = deleteQueue bq nodeName
clearJobs :: (Unit a)
=> JobQueue e a
-> IO [(String, Job a)]
clearJobs JobQueue { jqBackendQueue = bq } = loop []
where
loop dequeued = do
obj <- readQueue bq
case obj of
Nothing -> return dequeued
Just (bs, nodeName) -> case (fmap fst . listToMaybe . reads) $ BS.unpack bs of
Nothing -> return dequeued
Just job -> loop ((nodeName, job):dequeued)
peekJob :: (Unit a)
=> JobQueue e a
-> IO (Maybe (Job a))
peekJob jobqueue = do
mjob <- peekJob' jobqueue
return $ case mjob of
Just (job, _nodeName, _idName, _version) -> Just job
Nothing -> Nothing