{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Periodic.Server.Client
( ClientT
, handleSessionT
) where
import Control.Monad (unless, when)
import Control.Monad.Trans.Class (lift)
import Data.Binary (encode)
import Data.Byteable (toBytes)
import qualified Data.ByteString.Char8 as B (intercalate)
import Data.ByteString.Lazy (toStrict)
import Metro.Class (Transport)
import Metro.Conn (fromConn)
import qualified Metro.Conn as Conn
import Metro.Session (env, getSessionEnv1, receive,
send)
import Periodic.IOList (delete, elem, insert)
import Periodic.Node
import Periodic.Server.Persist (Persist)
import Periodic.Server.Scheduler
import Periodic.Server.Types
import qualified Periodic.Types.ClientCommand as CC
import Periodic.Types.Internal (ConfigKey (..))
import Periodic.Types.Job (getFuncName, initJob)
import Periodic.Types.Packet (getPacketData, packetRES)
import qualified Periodic.Types.WorkerCommand as WC
import Prelude hiding (elem)
import System.Log.Logger (errorM)
import UnliftIO
type ClientT db tp m = NodeT ClientConfig Command tp (SchedT db tp m)
handleClientSessionT
:: (MonadUnliftIO m, Persist db, Transport tp)
=> CC.ClientCommand -> SessionT ClientConfig Command tp (SchedT db tp m) ()
handleClientSessionT (CC.SubmitJob job) = do
lift $ pushJob job
send $ packetRES Success
handleClientSessionT (CC.RunJob job) = do
preR <- lift $ lookupPrevResult job
case preR of
Just v -> send $ packetRES $ Data v
Nothing -> do
c <- lift . canRun $ getFuncName job
if c then do
lift $ prepareWait job
lift $ pushJob job
state <- fromConn Conn.statusTVar
w <- lift $ waitResult state job
send . packetRES $ Data w
else send $ packetRES NoWorker
handleClientSessionT CC.Status = do
stats <- lift $ map toBytes <$> status
send . packetRES . Data $ B.intercalate "\n" stats
handleClientSessionT CC.Ping = send $ packetRES Pong
handleClientSessionT (CC.DropFunc fn) = do
lift $ dropFunc fn
send $ packetRES Success
handleClientSessionT (CC.RemoveJob fn jn) = do
lift $ removeJob $ initJob fn jn
send $ packetRES Success
handleClientSessionT CC.Shutdown = lift shutdown
handleClientSessionT (CC.ConfigGet (ConfigKey key)) = do
v <- lift $ getConfigInt key
send $ packetRES $ Config v
handleClientSessionT (CC.ConfigSet (ConfigKey key) v) = do
lift $ setConfigInt key v
send $ packetRES Success
handleClientSessionT CC.Dump = send =<< lift (packetRES . Data . toStrict . encode <$> dumpJob)
handleClientSessionT (CC.Load jobs) = do
lift $ mapM_ pushJob jobs
send $ packetRES Success
handleWorkerSessionT
:: (MonadUnliftIO m, Persist db, Transport tp)
=> ClientConfig -> WC.WorkerCommand -> SessionT ClientConfig Command tp (SchedT db tp m) ()
handleWorkerSessionT ClientConfig {..} WC.GrabJob = do
env0 <- getSessionEnv1
lift $ pushGrab wFuncList wJobQueue env0
handleWorkerSessionT ClientConfig {..} (WC.WorkDone jh w) = do
lift $ doneJob jh w
delete wJobQueue jh
handleWorkerSessionT ClientConfig {..} (WC.WorkFail jh) = do
lift $ failJob jh
delete wJobQueue jh
handleWorkerSessionT ClientConfig {..} (WC.SchedLater jh l s) = do
lift $ schedLaterJob jh l s
delete wJobQueue jh
handleWorkerSessionT ClientConfig {..} WC.Sleep = send $ packetRES Noop
handleWorkerSessionT ClientConfig {..} WC.Ping = send $ packetRES Pong
handleWorkerSessionT ClientConfig {..} (WC.CanDo fn) = do
has <- elem wFuncList fn
unless has $ do
lift $ addFunc fn
insert wFuncList fn
handleWorkerSessionT ClientConfig {..} (WC.CantDo fn) = do
has <- elem wFuncList fn
when has $ do
lift $ removeFunc fn
delete wFuncList fn
handleWorkerSessionT ClientConfig {..} (WC.Broadcast fn) = do
has <- elem wFuncList fn
unless has $ do
lift $ broadcastFunc fn True
insert wFuncList fn
handleWorkerSessionT _ (WC.Acquire n c jh) = do
r <- lift $ acquireLock n c jh
send $ packetRES $ Acquired r
handleWorkerSessionT _ (WC.Release n jh) = lift $ releaseLock n jh
handleSessionT
:: (MonadUnliftIO m, Persist db, Transport tp)
=> SessionT ClientConfig Command tp (SchedT db tp m) ()
handleSessionT = do
mcmd <- receive
case mcmd of
Nothing -> do
liftIO $ errorM "Periodic.Server.Client" "Client error"
fromConn Conn.close
Just pkt ->
case getPacketData pkt of
CC cmd -> handleClientSessionT cmd
WC cmd -> do
env0 <- env
handleWorkerSessionT env0 cmd