module Transient.Move.Services.Executor where

import Transient.Internals
import Transient.Move.Internals
import Transient.Logged
import Transient.Move.Services
import Data.IORef
import System.IO.Unsafe
import qualified Data.Map as M
import qualified Data.ByteString.Lazy.Char8   as BS
import qualified Data.ByteString.Char8 as BSS
import Data.String
import Data.Typeable
import Control.Applicative
import Control.Monad
import Control.Monad.State (liftIO)
import Data.Maybe(mapMaybe)

executorService = [("service","executor")
                  ,("executable", "executor")
                  ,("package","https://github.com/transient-haskell/transient-universe")]



-- initialize N instances, of the executor service. The monitor would spread them among the nodes available.
-- the number N should be less of equal than the number of phisical machines.
-- Since the executor serivice can execute any number of processes, it sould be at most one per machine. 

initExecute  number=  requestInstance   executorService  number

-- | execute a command in some node by an executor service, and return the result when the program finishes
networkExecute ::  String -> String ->  Cloud String
networkExecute  cmdline input=
     callService  executorService (cmdline, input,())



-- | execute a process in some machine trough the local monitor and the executor.
-- This call return a process identifier
-- The process can be controlled with other services like `controlNodeProcess`
networkExecuteStream'  :: String -> Cloud String
networkExecuteStream'  cmdline= do
    -- callService  executorService cmdline
     node <- initService  executorService
     return () !> ("STORED NODE", node)
     name <- callService' node $ ExecuteStream cmdline
     localIO $ print ("NAME", name)
     localIO $ atomicModifyIORef rnodecmd $ \map -> (M.insert name node map,())
     local $ setRemoteJob (BSS.pack name) node         -- so it can be stopped by `killRemoteJob`
     return name

-- | execute a shell command in some node using the executor service. 
-- The response is received as an stream of responses, one per line
networkExecuteStream :: String -> Cloud String      -- '[Multithreaded,Streaming]
networkExecuteStream  cmdline= do
         node <- initService  executorService
         flag <- onAll $ liftIO $ newIORef False
         r <- callService'  node cmdline
         init <- onAll $ liftIO $ readIORef flag
         when (not init) $ do
            onAll $ liftIO $ writeIORef flag True   -- get the first line (header) as the name of the process
            local $ setRemoteJob  (BSS.pack r) node         -- so it can be stopped by `killRemoteJob`
            localIO $ atomicModifyIORef rnodecmd $ \map -> (M.insert r node map,())
         return r

rnodecmd= unsafePerformIO $ newIORef M.empty

-- | send a message that will be read by the standard input of the program initiated by `networkExecuteStream`, identified by the command line.
-- the stream of responses is returned by that primitive. `sendExecuteStream` never return anything, since it is asynchronous
sendExecuteStream :: String -> String -> Cloud  ()  -- '[Asynchronous]
sendExecuteStream  cmdline msg=  do

     return () !> ("SENDEXECUTE", cmdline)
     node <- nodeForProcess cmdline
      --localIO $ do
      -- map <- readIORef rnodecmd 
      -- let mn = M.lookup cmdline map 
      -- case mn of
      --   Nothing ->  error $ "sendExecuteStream: no node executing the command: "++ cmdline
      --   Just n -> return n
     return () !> ("NODE", node)
     callService'  node (cmdline, msg)


controlNodeProcess cmdline= do
      exnode <- nodeForProcess cmdline
        --local $ do
        --    map <-  readIORef rinput 
        --    let mn = M.lookup cmdline map 
        --    return $ case mn of
        --           Nothing ->  error $ "sendExecuteStream: no node executing the command: "++ cmdline
        --           Just n ->   n

      send exnode  <|>  receive exnode
      where

      send exnode= do
         local abduce
         local $ do
            liftIO $ writeIORef lineprocessmode True
            oldprompt <- liftIO $ atomicModifyIORef rprompt $ \oldp -> ( takeWhile (/= ' ') cmdline++ "> ",oldp)
            cbs <- liftIO $ atomicModifyIORef rcb $ \cbs -> ([],cbs) -- remove local node options
            setState (oldprompt,cbs)                                             -- store them


         endcontrolop exnode <|> kill exnode <|> log exnode <|> inputs exnode
         empty

      kill exnode= do
         local $ option "kill" "kill the process"
         localIO $ putStrLn "process terminated"
         killRemoteJob exnode $ fromString cmdline
         endcontrol exnode

      endcontrolop exnode= do
         local $ option "endcontrol"  "end controlling the process"
         localIO $ putStrLn "end controlling the process"
         endcontrol exnode

      endcontrol exnode= do
         localIO $ writeIORef lineprocessmode False
         killRemoteJob exnode  controlToken
         local $ do

            (oldprompt,cbs) <- getState
            liftIO $ writeIORef rcb cbs -- restore local node options
            liftIO $ writeIORef rprompt  oldprompt

      log exnode = do
              local $ option "log" "display the log of the node"
              log <- getLogCmd cmdline exnode
              localIO $ do

                 putStr "\n\n------------- LOG OF PROCESS: ">> print cmdline >> putStrLn ""
                 mapM_ BS.putStrLn $ BS.lines log
                 putStrLn   "------------- END OF LOG"

      inputs exnode= do

          line <- local $ inputf False "input"  "" Nothing (const True)
          sendExecuteStream cmdline line


      receive exnode=  do
         r <- receiveExecuteStream cmdline exnode
         when (not $ null r) $ localIO $ putStrLn  r
         empty

      receiveExecuteStream cmd node=do
          local $ setRemoteJob  controlToken node
          callService'  node $ ReceiveExecuteStream cmd controlToken

      getLogCmd :: String -> Node -> Cloud BS.ByteString
      getLogCmd cmd node= callService' node (GetLogCmd cmd)

newtype GetLogCmd= GetLogCmd String  deriving (Read, Show, Typeable)
instance Loggable GetLogCmd

newtype ExecuteStream= ExecuteStream String deriving (Read, Show, Typeable)
instance Loggable ExecuteStream

data    ReceiveExecuteStream= ReceiveExecuteStream String BSS.ByteString deriving (Read, Show, Typeable)
instance Loggable ReceiveExecuteStream

data    GetProcesses= GetProcesses deriving (Read, Show, Typeable)
instance Loggable GetProcesses

getProcesses ::  Node -> Cloud [String]
getProcesses node= callService' node GetProcesses





-- | get the executor that executes a process


nodeForProcess :: String -> Cloud Node
nodeForProcess process= do

  callService monitorService () :: Cloud ()  -- start/ping monitor if not started

  nods <- squeezeMonitor [] monitorNode
  case nods of
    []    -> error $ "no node running: "++ process
    nod:_ -> return nod
  where
  squeezeMonitor :: [Node] -> Node -> Cloud [Node]
  squeezeMonitor  exc nod= do
    if  nod `elem` exc then return [] else do
      nodes <- callService' nod GetNodes :: Cloud [Node]
      return . concat =<< mapM squeeze (tail nodes)

      where
      squeeze :: Node -> Cloud [Node]
      squeeze  node= do

          case lookup2 "service" $ nodeServices node of

                Just "monitor" -> squeezeMonitor (nod:exc)  node

                Just "executor" -> do

                    procs <- callService' node GetProcesses :: Cloud [String]
                    if process `elem` procs then return [node] else return []

                _ -> return []