-- |
-- Module     : Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
-- Copyright  : Copyright (c) 2015-2018, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 7.10.3
--
-- This module is responsible for managing the connections.
--
module Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
       (ConnectionManager,
        ConnectionParams(..),
        newConnectionManager,
        tryAddMessageReceiver,
        addMessageReceiver,
        removeMessageReceiver,
        clearMessageReceivers,
        reconnectMessageReceivers,
        filterMessageReceivers,
        existsMessageReceiver,
        trySendKeepAlive,
        trySendKeepAliveUTC) where

import qualified Data.Map as M
import qualified Data.Set as S
import Data.Maybe
import Data.Either
import Data.IORef
import Data.Time.Clock
import Data.Word

import Control.Monad
import Control.Monad.Trans
import Control.Concurrent
import qualified Control.Distributed.Process as DP

import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message

-- | The connection parameters.
data ConnectionParams =
  ConnectionParams { ConnectionParams -> Priority
connLoggingPriority :: Priority,
                     -- ^ the logging priority
                     ConnectionParams -> Int
connKeepAliveInterval :: Int,
                     -- ^ the interval in microseconds to send keep-alive messages
                     ConnectionParams -> Int
connReconnectingDelay :: Int,
                     -- ^ the reconnecting delay in microseconds
                     ConnectionParams -> Int
connMonitoringDelay :: Int
                     -- ^ the monitoring delay in microseconds
                   }

-- | The connection manager.
data ConnectionManager =
  ConnectionManager { ConnectionManager -> ConnectionParams
connParams :: ConnectionParams,
                      -- ^ the manager parameter
                      ConnectionManager -> IORef UTCTime
connKeepAliveTimestamp :: IORef UTCTime,
                      -- ^ the keep alive timestamp
                      ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers :: IORef (M.Map DP.ProcessId ConnectionMessageReceiver)
                      -- ^ the receivers of messages
                    }

-- | The connection message receiver.
data ConnectionMessageReceiver =
  ConnectionMessageReceiver { ConnectionMessageReceiver -> ProcessId
connReceiverProcess :: DP.ProcessId,
                              -- ^ the receiver of messages
                              ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor :: IORef (Maybe DP.MonitorRef),
                              -- ^ a monitor of the message receiver node
                              ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor :: IORef (Maybe DP.MonitorRef)
                              -- ^ a monitor of the message receiver
                            }

-- | Create a new connection manager.
newConnectionManager :: ConnectionParams -> IO ConnectionManager
newConnectionManager :: ConnectionParams -> IO ConnectionManager
newConnectionManager ConnectionParams
ps =
  do IORef UTCTime
timestamp <- IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO (IORef UTCTime)) -> IO (IORef UTCTime)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UTCTime -> IO (IORef UTCTime)
forall a. a -> IO (IORef a)
newIORef
     IORef (Map ProcessId ConnectionMessageReceiver)
receivers <- Map ProcessId ConnectionMessageReceiver
-> IO (IORef (Map ProcessId ConnectionMessageReceiver))
forall a. a -> IO (IORef a)
newIORef Map ProcessId ConnectionMessageReceiver
forall k a. Map k a
M.empty
     ConnectionManager -> IO ConnectionManager
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ConnectionManager { connParams :: ConnectionParams
connParams = ConnectionParams
ps,
                                connKeepAliveTimestamp :: IORef UTCTime
connKeepAliveTimestamp = IORef UTCTime
timestamp,
                                connReceivers :: IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers = IORef (Map ProcessId ConnectionMessageReceiver)
receivers  }

-- | Try to add the connection message receiver.
tryAddMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process Bool
tryAddMessageReceiver :: ConnectionManager -> ProcessId -> Process Bool
tryAddMessageReceiver ConnectionManager
manager ProcessId
pid =
  do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$
          ConnectionManager -> ProcessId -> IO Bool
existsMessageReceiver ConnectionManager
manager ProcessId
pid
     if Bool
f
       then Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
       else do ConnectionManager -> ProcessId -> Process ()
addMessageReceiver ConnectionManager
manager ProcessId
pid
               Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- | Add the connection message receiver.
addMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process ()
addMessageReceiver :: ConnectionManager -> ProcessId -> Process ()
addMessageReceiver ConnectionManager
manager ProcessId
pid =
  do IORef (Maybe MonitorRef)
r1 <- IO (IORef (Maybe MonitorRef)) -> Process (IORef (Maybe MonitorRef))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe MonitorRef))
 -> Process (IORef (Maybe MonitorRef)))
-> IO (IORef (Maybe MonitorRef))
-> Process (IORef (Maybe MonitorRef))
forall a b. (a -> b) -> a -> b
$ Maybe MonitorRef -> IO (IORef (Maybe MonitorRef))
forall a. a -> IO (IORef a)
newIORef Maybe MonitorRef
forall a. Maybe a
Nothing
     IORef (Maybe MonitorRef)
r2 <- IO (IORef (Maybe MonitorRef)) -> Process (IORef (Maybe MonitorRef))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe MonitorRef))
 -> Process (IORef (Maybe MonitorRef)))
-> IO (IORef (Maybe MonitorRef))
-> Process (IORef (Maybe MonitorRef))
forall a b. (a -> b) -> a -> b
$ Maybe MonitorRef -> IO (IORef (Maybe MonitorRef))
forall a. a -> IO (IORef a)
newIORef Maybe MonitorRef
forall a. Maybe a
Nothing
     let x :: ConnectionMessageReceiver
x = ConnectionMessageReceiver { connReceiverProcess :: ProcessId
connReceiverProcess = ProcessId
pid,
                                         connReceiverNodeMonitor :: IORef (Maybe MonitorRef)
connReceiverNodeMonitor = IORef (Maybe MonitorRef)
r1,
                                         connReceiverMonitor :: IORef (Maybe MonitorRef)
connReceiverMonitor = IORef (Maybe MonitorRef)
r2 }
     ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
x
     IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
       IORef (Map ProcessId ConnectionMessageReceiver)
-> (Map ProcessId ConnectionMessageReceiver
    -> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager) ((Map ProcessId ConnectionMessageReceiver
  -> Map ProcessId ConnectionMessageReceiver)
 -> IO ())
-> (Map ProcessId ConnectionMessageReceiver
    -> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a b. (a -> b) -> a -> b
$
       ProcessId
-> ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ProcessId
pid ConnectionMessageReceiver
x

-- | Remove the connection message receiver.
removeMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process ()
removeMessageReceiver :: ConnectionManager -> ProcessId -> Process ()
removeMessageReceiver ConnectionManager
manager ProcessId
pid =
  do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
 -> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
     case ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Maybe ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId ConnectionMessageReceiver
rs of
       Maybe ConnectionMessageReceiver
Nothing ->
         ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitored process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
       Just ConnectionMessageReceiver
r  ->
         do ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
              IORef (Map ProcessId ConnectionMessageReceiver)
-> (Map ProcessId ConnectionMessageReceiver
    -> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager) ((Map ProcessId ConnectionMessageReceiver
  -> Map ProcessId ConnectionMessageReceiver)
 -> IO ())
-> (Map ProcessId ConnectionMessageReceiver
    -> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Map k a
M.delete ProcessId
pid

-- | Clear the connection message receivers.
clearMessageReceivers :: ConnectionManager -> DP.Process ()
clearMessageReceivers :: ConnectionManager -> Process ()
clearMessageReceivers ConnectionManager
manager =
  do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
 -> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
     [ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProcessId ConnectionMessageReceiver
-> [ConnectionMessageReceiver]
forall k a. Map k a -> [a]
M.elems Map ProcessId ConnectionMessageReceiver
rs) ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ConnectionMessageReceiver
r ->
       do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
          ConnectionManager -> ProcessId -> Process ()
removeMessageReceiver ConnectionManager
manager ProcessId
pid

-- | Reconnect to the message receivers.
reconnectMessageReceivers :: ConnectionManager -> [DP.ProcessId] -> DP.Process ()
reconnectMessageReceivers :: ConnectionManager -> [ProcessId] -> Process ()
reconnectMessageReceivers ConnectionManager
manager [ProcessId]
pids =
  do [ConnectionMessageReceiver]
rs <- ConnectionManager
-> [ProcessId] -> Process [ConnectionMessageReceiver]
messageReceivers ConnectionManager
manager [ProcessId]
pids
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ConnectionMessageReceiver] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ConnectionMessageReceiver]
rs) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
       do [ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ConnectionMessageReceiver]
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$
            ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiver ConnectionManager
manager
          IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
            Int -> IO ()
threadDelay (ConnectionParams -> Int
connReconnectingDelay (ConnectionParams -> Int) -> ConnectionParams -> Int
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager)
          ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE String
"Begin reconnecting..."
          [ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ConnectionMessageReceiver]
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$
            ConnectionManager -> ConnectionMessageReceiver -> Process ()
reconnectToMessageReceiver ConnectionManager
manager
          IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
            Int -> IO ()
threadDelay (ConnectionParams -> Int
connMonitoringDelay (ConnectionParams -> Int) -> ConnectionParams -> Int
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager)
          ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE String
"Begin remonitoring..."
          [ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ConnectionMessageReceiver]
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$
            ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiver ConnectionManager
manager

-- | Unmonitor the message receiver.
unmonitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r =
  do ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r
     ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r

-- | Monitor the message receiver.
monitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r =
  do ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r
     ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r

-- | Unmonitor the message receiver process.
unmonitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r =
  do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
     Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r)
     case Maybe MonitorRef
ref of
       Just MonitorRef
m  ->
         do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Unmonitoring process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
            MonitorRef -> Process ()
DP.unmonitor MonitorRef
m
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r) Maybe MonitorRef
forall a. Maybe a
Nothing
       Maybe MonitorRef
Nothing ->
         ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitor reference for process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid

-- | Monitor the message receiver process.
monitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r =
  do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
     Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r)
     case Maybe MonitorRef
ref of
       Maybe MonitorRef
Nothing ->
         do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Monitoring process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
            MonitorRef
x <- ProcessId -> Process MonitorRef
DP.monitor ProcessId
pid
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)
       Just MonitorRef
x0 ->
         do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Re-monitoring process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
            MonitorRef
x <- ProcessId -> Process MonitorRef
DP.monitor ProcessId
pid
            MonitorRef -> Process ()
DP.unmonitor MonitorRef
x0
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)

-- | Unmonitor the message receiver node.
unmonitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r =
  do let nid :: NodeId
nid = ProcessId -> NodeId
DP.processNodeId (ProcessId -> NodeId) -> ProcessId -> NodeId
forall a b. (a -> b) -> a -> b
$ ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
     Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r)
     case Maybe MonitorRef
ref of
       Just MonitorRef
m  ->
         do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Unmonitoring node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
            MonitorRef -> Process ()
DP.unmonitor MonitorRef
m
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r) Maybe MonitorRef
forall a. Maybe a
Nothing
       Maybe MonitorRef
Nothing ->
         ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitor reference for node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid

-- | Monitor the message receiver node.
monitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r =
  do let nid :: NodeId
nid = ProcessId -> NodeId
DP.processNodeId (ProcessId -> NodeId) -> ProcessId -> NodeId
forall a b. (a -> b) -> a -> b
$ ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
     Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r)
     case Maybe MonitorRef
ref of
       Maybe MonitorRef
Nothing ->
         do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Monitoring node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
            MonitorRef
x <- NodeId -> Process MonitorRef
DP.monitorNode NodeId
nid
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)
       Just MonitorRef
x0 ->
         do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Re-monitoring node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
            MonitorRef
x <- NodeId -> Process MonitorRef
DP.monitorNode NodeId
nid
            MonitorRef -> Process ()
DP.unmonitor MonitorRef
x0
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)

-- | Reconnect to the message receiver.
reconnectToMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
reconnectToMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
reconnectToMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r =
  do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
     ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Direct reconnecting to " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
     ProcessId -> Process ()
DP.reconnect ProcessId
pid

-- | Whether the connection message receiver exists.
existsMessageReceiver :: ConnectionManager -> DP.ProcessId -> IO Bool
existsMessageReceiver :: ConnectionManager -> ProcessId -> IO Bool
existsMessageReceiver ConnectionManager
manager ProcessId
pid =
  IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager) IO (Map ProcessId ConnectionMessageReceiver)
-> (Map ProcessId ConnectionMessageReceiver -> IO Bool) -> IO Bool
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
  Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool)
-> (Map ProcessId ConnectionMessageReceiver -> Bool)
-> Map ProcessId ConnectionMessageReceiver
-> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Map ProcessId ConnectionMessageReceiver -> Bool
forall k a. Ord k => k -> Map k a -> Bool
M.member ProcessId
pid

-- | Try to send keep-alive messages.
trySendKeepAlive :: ConnectionManager -> DP.Process ()
trySendKeepAlive :: ConnectionManager -> Process ()
trySendKeepAlive ConnectionManager
manager =
  do Bool
empty <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ (Map ProcessId ConnectionMessageReceiver -> Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId ConnectionMessageReceiver -> Bool
forall k a. Map k a -> Bool
M.null (IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
empty (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ 
       do UTCTime
utc <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
          ConnectionManager -> UTCTime -> Process ()
trySendKeepAliveUTC ConnectionManager
manager UTCTime
utc

-- | Try to send keep-alive messages by the specified current time.
trySendKeepAliveUTC :: ConnectionManager -> UTCTime -> DP.Process ()
trySendKeepAliveUTC :: ConnectionManager -> UTCTime -> Process ()
trySendKeepAliveUTC ConnectionManager
manager UTCTime
utc =
  do Bool
empty <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ (Map ProcessId ConnectionMessageReceiver -> Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId ConnectionMessageReceiver -> Bool
forall k a. Map k a -> Bool
M.null (IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
empty (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ 
       do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> UTCTime -> IO Bool
shouldSendKeepAlive ConnectionManager
manager UTCTime
utc
          Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
            do ---
               ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
INFO (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
                 String
"Sending keep-alive messages"
               ---
               IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionManager -> IORef UTCTime
connKeepAliveTimestamp ConnectionManager
manager) UTCTime
utc
               Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
 -> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
               Map ProcessId ConnectionMessageReceiver
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Map ProcessId ConnectionMessageReceiver
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ConnectionMessageReceiver
r ->
                 do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
                    ProcessId -> GeneralMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid GeneralMessage
KeepAliveMessage

-- | Whether should send a keep-alive message.
shouldSendKeepAlive :: ConnectionManager -> UTCTime -> IO Bool
shouldSendKeepAlive :: ConnectionManager -> UTCTime -> IO Bool
shouldSendKeepAlive ConnectionManager
manager UTCTime
utc =
  do UTCTime
utc0 <- IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef (ConnectionManager -> IORef UTCTime
connKeepAliveTimestamp ConnectionManager
manager)
     let dt :: Double
dt = Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
utc UTCTime
utc0)
     Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
       Double -> Int
secondsToMicroseconds Double
dt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> (ConnectionParams -> Int
connKeepAliveInterval (ConnectionParams -> Int) -> ConnectionParams -> Int
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager)

-- | Convert seconds to microseconds.
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds Double
x = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Integer
forall a. Integral a => a -> Integer
toInteger (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Double -> Integer
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
1000000 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
x)

-- | Get the connection message receivers.
messageReceivers :: ConnectionManager -> [DP.ProcessId] -> DP.Process [ConnectionMessageReceiver]
messageReceivers :: ConnectionManager
-> [ProcessId] -> Process [ConnectionMessageReceiver]
messageReceivers ConnectionManager
manager [ProcessId]
pids =
  do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
 -> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
     ([[ConnectionMessageReceiver]] -> [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
-> Process [ConnectionMessageReceiver]
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[ConnectionMessageReceiver]] -> [ConnectionMessageReceiver]
forall a. Monoid a => [a] -> a
mconcat (Process [[ConnectionMessageReceiver]]
 -> Process [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
-> Process [ConnectionMessageReceiver]
forall a b. (a -> b) -> a -> b
$
       [ProcessId]
-> (ProcessId -> Process [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ProcessId]
pids ((ProcessId -> Process [ConnectionMessageReceiver])
 -> Process [[ConnectionMessageReceiver]])
-> (ProcessId -> Process [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid ->
       case ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Maybe ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId ConnectionMessageReceiver
rs of
         Just ConnectionMessageReceiver
x  -> [ConnectionMessageReceiver] -> Process [ConnectionMessageReceiver]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return [ConnectionMessageReceiver
x]
         Maybe ConnectionMessageReceiver
Nothing ->
           do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitored process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
              [ConnectionMessageReceiver] -> Process [ConnectionMessageReceiver]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return []

-- | Filter the message receivers.
filterMessageReceivers :: ConnectionManager -> [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessId]
filterMessageReceivers :: ConnectionManager
-> [ProcessMonitorNotification] -> Process [ProcessId]
filterMessageReceivers ConnectionManager
manager [ProcessMonitorNotification]
ms =
  do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
 -> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
     ([[ProcessId]] -> [ProcessId])
-> Process [[ProcessId]] -> Process [ProcessId]
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set ProcessId -> [ProcessId]
forall a. Set a -> [a]
S.toList (Set ProcessId -> [ProcessId])
-> ([[ProcessId]] -> Set ProcessId) -> [[ProcessId]] -> [ProcessId]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ProcessId] -> Set ProcessId
forall a. Ord a => [a] -> Set a
S.fromList ([ProcessId] -> Set ProcessId)
-> ([[ProcessId]] -> [ProcessId]) -> [[ProcessId]] -> Set ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [[ProcessId]] -> [ProcessId]
forall a. Monoid a => [a] -> a
mconcat) (Process [[ProcessId]] -> Process [ProcessId])
-> Process [[ProcessId]] -> Process [ProcessId]
forall a b. (a -> b) -> a -> b
$
       [ProcessMonitorNotification]
-> (ProcessMonitorNotification -> Process [ProcessId])
-> Process [[ProcessId]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ProcessMonitorNotification]
ms ((ProcessMonitorNotification -> Process [ProcessId])
 -> Process [[ProcessId]])
-> (ProcessMonitorNotification -> Process [ProcessId])
-> Process [[ProcessId]]
forall a b. (a -> b) -> a -> b
$ \(DP.ProcessMonitorNotification MonitorRef
ref ProcessId
pid DiedReason
_) ->
       case ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Maybe ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId ConnectionMessageReceiver
rs of
         Maybe ConnectionMessageReceiver
Nothing ->
           do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitored process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
              [ProcessId] -> Process [ProcessId]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return []
         Just ConnectionMessageReceiver
x  ->
           do Maybe MonitorRef
ref0 <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
x)
              if Maybe MonitorRef
ref0 Maybe MonitorRef -> Maybe MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
ref
                then [ProcessId] -> Process [ProcessId]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return [ProcessId
pid]
                else do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Received the old monitor reference for process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
                        [ProcessId] -> Process [ProcessId]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return []

-- | Log the message with the specified priority.
logConnectionManager :: ConnectionManager -> Priority -> String -> DP.Process ()
{-# INLINE logConnectionManager #-}
logConnectionManager :: ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
p String
message =
  Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionParams -> Priority
connLoggingPriority (ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager) Priority -> Priority -> Bool
forall a. Ord a => a -> a -> Bool
<= Priority
p) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
  String -> Process ()
DP.say (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
  Priority -> String
embracePriority Priority
p String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
message