module Control.Distributed.Process.Internal.Messaging
  ( sendPayload
  , sendBinary
  , sendMessage
  , disconnect
  , closeImplicitReconnections
  , impliesDeathOf
  , sendCtrlMsg
  ) where

import Data.Accessor ((^.), (^=))
import Data.Binary (Binary, encode)
import qualified Data.Map as Map (partitionWithKey, elems)
import qualified Data.ByteString.Lazy as BSL (toChunks)
import qualified Data.ByteString as BSS (ByteString)
import Control.Distributed.Process.Serializable ()

import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (writeChan)
import Control.Exception (mask_)
import Control.Monad (unless)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import qualified Network.Transport as NT
  ( Connection
  , send
  , defaultConnectHints
  , connect
  , Reliability(ReliableOrdered)
  , close
  )
import Control.Distributed.Process.Internal.Types
  ( LocalNode(localState, localEndPoint, localCtrlChan)
  , withValidLocalState
  , modifyValidLocalState
  , modifyValidLocalState_
  , Identifier
  , localConnections
  , localConnectionBetween
  , nodeAddress
  , nodeOf
  , messageToPayload
  , createMessage
  , NCMsg(..)
  , ProcessSignal(Died)
  , DiedReason(DiedDisconnect)
  , ImplicitReconnect(WithImplicitReconnect)
  , NodeId(..)
  , ProcessId(..)
  , LocalNode(..)
  , LocalProcess(..)
  , Process(..)
  , SendPortId(sendPortProcessId)
  , Identifier(NodeIdentifier, ProcessIdentifier, SendPortIdentifier)
  )
import Control.Distributed.Process.Serializable (Serializable)
import Data.Foldable (forM_)

--------------------------------------------------------------------------------
-- Message sending                                                            --
--------------------------------------------------------------------------------

sendPayload :: LocalNode
            -> Identifier
            -> Identifier
            -> ImplicitReconnect
            -> [BSS.ByteString]
            -> IO ()
sendPayload node from to implicitReconnect payload = do
  mConn <- connBetween node from to implicitReconnect
  didSend <- case mConn of
    Just conn -> do
      didSend <- NT.send conn payload
      case didSend of
        Left _err -> return False
        Right ()  -> return True
    Nothing -> return False
  unless didSend $ do
    writeChan (localCtrlChan node) NCMsg
      { ctrlMsgSender = to
      , ctrlMsgSignal = Died (NodeIdentifier $ nodeOf to) DiedDisconnect
      }

sendBinary :: Binary a
           => LocalNode
           -> Identifier
           -> Identifier
           -> ImplicitReconnect
           -> a
           -> IO ()
sendBinary node from to implicitReconnect
  = sendPayload node from to implicitReconnect . BSL.toChunks . encode

sendMessage :: Serializable a
            => LocalNode
            -> Identifier
            -> Identifier
            -> ImplicitReconnect
            -> a
            -> IO ()
sendMessage node from to implicitReconnect =
  sendPayload node from to implicitReconnect . messageToPayload . createMessage

setupConnBetween :: LocalNode
                 -> Identifier
                 -> Identifier
                 -> ImplicitReconnect
                 -> IO (Maybe NT.Connection)
setupConnBetween node from to implicitReconnect = do
    mConn <- NT.connect (localEndPoint node)
                        (nodeAddress . nodeOf $ to)
                        NT.ReliableOrdered
                        NT.defaultConnectHints
    case mConn of
      Right conn -> do
        didSend <- NT.send conn (BSL.toChunks . encode $ to)
        case didSend of
          Left _ ->
            return Nothing
          Right () -> do
            modifyValidLocalState_ node $
              return . (localConnectionBetween from to ^= Just (conn, implicitReconnect))
            return $ Just conn
      Left _ ->
        return Nothing

connBetween :: LocalNode
            -> Identifier
            -> Identifier
            -> ImplicitReconnect
            -> IO (Maybe NT.Connection)
connBetween node from to implicitReconnect = do
    mConn <- withValidLocalState node $
      return . (^. localConnectionBetween from to)
    case mConn of
      Just (conn, _) ->
        return $ Just conn
      Nothing ->
        setupConnBetween node from to implicitReconnect

disconnect :: LocalNode -> Identifier -> Identifier -> IO ()
disconnect node from to = mask_ $ do
  mio <- modifyValidLocalState node $ \vst ->
    case vst ^. localConnectionBetween from to of
      Nothing ->
        return (vst, return ())
      Just (conn, _) -> do
        return ( localConnectionBetween from to ^= Nothing $ vst
               , NT.close conn
               )
  forM_ mio forkIO

closeImplicitReconnections :: LocalNode -> Identifier -> IO ()
closeImplicitReconnections node to = mask_ $ do
  mconns <- modifyValidLocalState node $ \vst -> do
    let shouldClose (_, to') (_, WithImplicitReconnect) = to `impliesDeathOf` to'
        shouldClose _ _ = False
    let (affected, unaffected) =
          Map.partitionWithKey shouldClose (vst ^. localConnections)
    return ( localConnections ^= unaffected $ vst
           , map fst $ Map.elems affected
           )
  forM_ mconns $ forkIO . mapM_ NT.close

-- | @a `impliesDeathOf` b@ is true if the death of @a@ (for instance, a node)
-- implies the death of @b@ (for instance, a process on that node)
impliesDeathOf :: Identifier
               -> Identifier
               -> Bool
NodeIdentifier nid `impliesDeathOf` NodeIdentifier nid' =
  nid' == nid
NodeIdentifier nid `impliesDeathOf` ProcessIdentifier pid =
  processNodeId pid == nid
NodeIdentifier nid `impliesDeathOf` SendPortIdentifier cid =
  processNodeId (sendPortProcessId cid) == nid
ProcessIdentifier pid `impliesDeathOf` ProcessIdentifier pid' =
  pid' == pid
ProcessIdentifier pid `impliesDeathOf` SendPortIdentifier cid =
  sendPortProcessId cid == pid
SendPortIdentifier cid `impliesDeathOf` SendPortIdentifier cid' =
  cid' == cid
_ `impliesDeathOf` _ =
  False


-- Send a control message. Evaluates the message to HNF before sending it.
--
-- The message shouldn't produce more errors when further evaluated. If
-- evaluation threw errors the node controller or the receiver would crash when
-- inspecting it.
sendCtrlMsg :: Maybe NodeId  -- ^ Nothing for the local node
            -> ProcessSignal -- ^ Message to send
            -> Process ()
sendCtrlMsg mNid signal = do
  proc <- ask
  let msg = NCMsg { ctrlMsgSender = ProcessIdentifier (processId proc)
                  , ctrlMsgSignal = signal
                  }
  case mNid of
    Nothing -> do
      liftIO $ writeChan (localCtrlChan (processNode proc)) $! msg
    Just nid ->
      liftIO $ sendBinary (processNode proc)
                          (ProcessIdentifier (processId proc))
                          (NodeIdentifier nid)
                          WithImplicitReconnect
                          msg