module Transient.Move.Services.Executor where
import Transient.Internals
import Transient.Move.Internals
import Transient.Logged
import Transient.Move.Services
import Data.IORef
import System.IO.Unsafe
import qualified Data.Map as M
import qualified Data.ByteString.Lazy.Char8 as BS
import qualified Data.ByteString.Char8 as BSS
import Data.String
import Data.Typeable
import Control.Applicative
import Control.Monad
import Control.Monad.State (liftIO)
import Data.Maybe(mapMaybe)
executorService = [("service","executor")
,("executable", "executor")
,("package","https://github.com/transient-haskell/transient-universe")]
initExecute number= requestInstance executorService number
networkExecute :: String -> String -> Cloud String
networkExecute cmdline input=
callService executorService (cmdline, input,())
networkExecuteStream' :: String -> Cloud String
networkExecuteStream' cmdline= do
node <- initService executorService
return () !> ("STORED NODE", node)
name <- callService' node $ ExecuteStream cmdline
localIO $ print ("NAME", name)
localIO $ atomicModifyIORef rnodecmd $ \map -> (M.insert name node map,())
local $ setRemoteJob (BSS.pack name) node
return name
networkExecuteStream :: String -> Cloud String
networkExecuteStream cmdline= do
node <- initService executorService
flag <- onAll $ liftIO $ newIORef False
r <- callService' node cmdline
init <- onAll $ liftIO $ readIORef flag
when (not init) $ do
onAll $ liftIO $ writeIORef flag True
local $ setRemoteJob (BSS.pack r) node
localIO $ atomicModifyIORef rnodecmd $ \map -> (M.insert r node map,())
return r
rnodecmd= unsafePerformIO $ newIORef M.empty
sendExecuteStream :: String -> String -> Cloud ()
sendExecuteStream cmdline msg= do
return () !> ("SENDEXECUTE", cmdline)
node <- nodeForProcess cmdline
return () !> ("NODE", node)
callService' node (cmdline, msg)
controlNodeProcess cmdline= do
exnode <- nodeForProcess cmdline
send exnode <|> receive exnode
where
send exnode= do
local abduce
local $ do
liftIO $ writeIORef lineprocessmode True
oldprompt <- liftIO $ atomicModifyIORef rprompt $ \oldp -> ( takeWhile (/= ' ') cmdline++ "> ",oldp)
cbs <- liftIO $ atomicModifyIORef rcb $ \cbs -> ([],cbs)
setState (oldprompt,cbs)
endcontrolop exnode <|> kill exnode <|> log exnode <|> inputs exnode
empty
kill exnode= do
local $ option "kill" "kill the process"
localIO $ putStrLn "process terminated"
killRemoteJob exnode $ fromString cmdline
endcontrol exnode
endcontrolop exnode= do
local $ option "endcontrol" "end controlling the process"
localIO $ putStrLn "end controlling the process"
endcontrol exnode
endcontrol exnode= do
localIO $ writeIORef lineprocessmode False
killRemoteJob exnode controlToken
local $ do
(oldprompt,cbs) <- getState
liftIO $ writeIORef rcb cbs
liftIO $ writeIORef rprompt oldprompt
log exnode = do
local $ option "log" "display the log of the node"
log <- getLogCmd cmdline exnode
localIO $ do
putStr "\n\n------------- LOG OF PROCESS: ">> print cmdline >> putStrLn ""
mapM_ BS.putStrLn $ BS.lines log
putStrLn "------------- END OF LOG"
inputs exnode= do
line <- local $ inputf False "input" "" Nothing (const True)
sendExecuteStream cmdline line
receive exnode= do
r <- receiveExecuteStream cmdline exnode
when (not $ null r) $ localIO $ putStrLn r
empty
receiveExecuteStream cmd node=do
local $ setRemoteJob controlToken node
callService' node $ ReceiveExecuteStream cmd controlToken
getLogCmd :: String -> Node -> Cloud BS.ByteString
getLogCmd cmd node= callService' node (GetLogCmd cmd)
newtype GetLogCmd= GetLogCmd String deriving (Read, Show, Typeable)
instance Loggable GetLogCmd
newtype ExecuteStream= ExecuteStream String deriving (Read, Show, Typeable)
instance Loggable ExecuteStream
data ReceiveExecuteStream= ReceiveExecuteStream String BSS.ByteString deriving (Read, Show, Typeable)
instance Loggable ReceiveExecuteStream
data GetProcesses= GetProcesses deriving (Read, Show, Typeable)
instance Loggable GetProcesses
getProcesses :: Node -> Cloud [String]
getProcesses node= callService' node GetProcesses
nodeForProcess :: String -> Cloud Node
nodeForProcess process= do
callService monitorService () :: Cloud ()
nods <- squeezeMonitor [] monitorNode
case nods of
[] -> error $ "no node running: "++ process
nod:_ -> return nod
where
squeezeMonitor :: [Node] -> Node -> Cloud [Node]
squeezeMonitor exc nod= do
if nod `elem` exc then return [] else do
nodes <- callService' nod GetNodes :: Cloud [Node]
return . concat =<< mapM squeeze (tail nodes)
where
squeeze :: Node -> Cloud [Node]
squeeze node= do
case lookup2 "service" $ nodeServices node of
Just "monitor" -> squeezeMonitor (nod:exc) node
Just "executor" -> do
procs <- callService' node GetProcesses :: Cloud [String]
if process `elem` procs then return [node] else return []
_ -> return []