module Control.Distributed.Process.Node
( LocalNode
, newLocalNode
, closeLocalNode
, forkProcess
, runProcess
, initRemoteTable
, localNodeId
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import System.IO (fixIO, hPutStrLn, stderr)
import System.Mem.Weak (Weak, deRefWeak)
import qualified Data.ByteString.Lazy as BSL (fromChunks)
import Data.Binary (decode)
import Data.Map (Map)
import qualified Data.Map as Map
( empty
, toList
, fromList
, filter
, partitionWithKey
, elems
, size
, filterWithKey
, foldlWithKey
)
import Data.Set (Set)
import qualified Data.Set as Set
( empty
, insert
, delete
, member
, toList
)
import Data.Foldable (forM_)
import Data.Maybe (isJust, fromJust, isNothing, catMaybes)
import Data.Typeable (Typeable)
import Control.Category ((>>>))
import Control.Applicative (Applicative, (<$>))
import Control.Monad (void, when, join)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.State.Strict (MonadState, StateT, evalStateT, gets)
import qualified Control.Monad.State.Strict as StateT (get, put)
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask)
import Control.Exception
( throwIO
, AsyncException(ThreadKilled)
, SomeException
, Exception
, throwTo
, uninterruptibleMask_
)
import qualified Control.Exception as Exception
( Handler(..)
, catch
, catches
, finally
)
import Control.Concurrent (forkIO, forkIOWithUnmask, killThread, myThreadId)
import Control.Distributed.Process.Internal.StrictMVar
( newMVar
, withMVar
, modifyMVarMasked
, modifyMVar_
, modifyMVar
, newEmptyMVar
, putMVar
, takeMVar
)
import Control.Concurrent.Chan (newChan, writeChan, readChan)
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
import Control.Concurrent.STM
( atomically
)
import Control.Distributed.Process.Internal.CQueue
( CQueue
, enqueue
, newCQueue
, mkWeakCQueue
, queueSize
)
import qualified Network.Transport as NT
( Transport
, EndPoint
, newEndPoint
, receive
, Event(..)
, EventErrorCode(..)
, TransportError(..)
, address
, closeEndPoint
, Connection
, ConnectionId
, close
, EndPointAddress
, Reliability(ReliableOrdered)
)
import Data.Accessor (Accessor, accessor, (^.), (^=), (^:))
import System.Random (randomIO)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Static as Static
( unclosure
, initRemoteTable
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, LocalProcessId(..)
, ProcessId(..)
, LocalNode(..)
, MxEventBus(..)
, LocalNodeState(..)
, ValidLocalNodeState(..)
, withValidLocalState
, modifyValidLocalState
, LocalProcess(..)
, LocalProcessState(..)
, Process(..)
, DiedReason(..)
, NCMsg(..)
, ProcessSignal(..)
, localPidCounter
, localPidUnique
, localProcessWithId
, localProcesses
, localConnections
, forever'
, MonitorRef(..)
, NodeClosedException(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, ProcessExitException(..)
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, SpawnRef
, DidSpawn(..)
, Message(..)
, TypedChannel(..)
, Identifier(..)
, nodeOf
, ProcessInfo(..)
, ProcessInfoNone(..)
, NodeStats(..)
, SendPortId(..)
, typedChannelWithId
, RegisterReply(..)
, WhereIsReply(..)
, payloadToMessage
, messageToPayload
, createUnencodedMessage
, unsafeCreateUnencodedMessage
, runLocalProcess
, firstNonReservedProcessId
, ImplicitReconnect(WithImplicitReconnect,NoImplicitReconnect)
)
import Control.Distributed.Process.Management.Internal.Agent
( mxAgentController
)
import qualified Control.Distributed.Process.Management.Internal.Table as Table
( mxTableCoordinator
, startTableCoordinator
)
import qualified Control.Distributed.Process.Management.Internal.Trace.Remote as Trace
( remoteTable
)
import Control.Distributed.Process.Management.Internal.Trace.Tracer
( defaultTracer
)
import Control.Distributed.Process.Management.Internal.Trace.Types
( TraceArg(..)
, traceEvent
, traceLogFmt
, enableTrace
)
import Control.Distributed.Process.Management.Internal.Types
( MxEvent(..)
)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Messaging
( sendBinary
, sendPayload
, closeImplicitReconnections
, impliesDeathOf
)
import Control.Distributed.Process.Internal.Primitives
( register
, receiveWait
, match
, sendChan
, try
, unwrapMessage
)
import Control.Distributed.Process.Internal.Types (SendPort, Tracer(..))
import qualified Control.Distributed.Process.Internal.Closure.BuiltIn as BuiltIn (remoteTable)
import Control.Distributed.Process.Internal.WeakTQueue (TQueue, writeTQueue)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC
( mapMaybe
, mapDefault
)
import Unsafe.Coerce
initRemoteTable :: RemoteTable
initRemoteTable = Trace.remoteTable $ BuiltIn.remoteTable Static.initRemoteTable
newLocalNode :: NT.Transport -> RemoteTable -> IO LocalNode
newLocalNode transport rtable = do
mEndPoint <- NT.newEndPoint transport
case mEndPoint of
Left ex -> throwIO ex
Right endPoint -> do
localNode <- createBareLocalNode endPoint rtable
startServiceProcesses localNode
return localNode
createBareLocalNode :: NT.EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode endPoint rtable = do
unq <- randomIO
state <- newMVar $ LocalNodeValid $ ValidLocalNodeState
{ _localProcesses = Map.empty
, _localPidCounter = firstNonReservedProcessId
, _localPidUnique = unq
, _localConnections = Map.empty
}
ctrlChan <- newChan
let node = LocalNode { localNodeId = NodeId $ NT.address endPoint
, localEndPoint = endPoint
, localState = state
, localCtrlChan = ctrlChan
, localEventBus = MxEventBusInitialising
, remoteTable = rtable
}
tracedNode <- startMxAgent node
void $ forkIO $ Exception.finally (runNodeController tracedNode)
(NT.closeEndPoint (localEndPoint node))
void $ forkIO $ Exception.finally (handleIncomingMessages tracedNode)
(stopNC node)
return tracedNode
where
stopNC node =
writeChan (localCtrlChan node) NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = SigShutdown
}
startMxAgent :: LocalNode -> IO LocalNode
startMxAgent node = do
let fork = forkProcess node
mv <- MVar.newEmptyMVar
pid <- fork $ mxAgentController fork mv
(tracer', wqRef, mxNew') <- MVar.takeMVar mv
return node { localEventBus = (MxEventBus pid tracer' wqRef mxNew') }
startDefaultTracer :: LocalNode -> IO ()
startDefaultTracer node' = do
let t = localEventBus node'
case t of
MxEventBus _ (Tracer pid _) _ _ -> do
runProcess node' $ register "trace.controller" pid
pid' <- forkProcess node' defaultTracer
enableTrace (localEventBus node') pid'
runProcess node' $ register "tracer.initial" pid'
_ -> return ()
startServiceProcesses :: LocalNode -> IO ()
startServiceProcesses node = do
startDefaultTracer node
tableCoordinatorPid <- fork $ Table.startTableCoordinator fork
runProcess node $ register Table.mxTableCoordinator tableCoordinatorPid
logger <- forkProcess node loop
runProcess node $ do
register "logger" logger
register "trace.logger" logger
where
fork = forkProcess node
loop = do
receiveWait
[ match $ \((time, pid, string) ::(String, ProcessId, String)) -> do
liftIO . hPutStrLn stderr $ time ++ " " ++ show pid ++ ": " ++ string
loop
, match $ \((time, string) :: (String, String)) -> do
liftIO . hPutStrLn stderr $ time ++ " [trace] " ++ string
loop
, match $ \(ch :: SendPort ()) ->
sendChan ch ()
]
closeLocalNode :: LocalNode -> IO ()
closeLocalNode node = do
join $ modifyMVar (localState node) $ \st -> case st of
LocalNodeValid vst -> do
return ( LocalNodeClosed
, forM_ (vst ^. localProcesses) $ \lproc ->
killThread (processThread lproc)
)
LocalNodeClosed -> return (LocalNodeClosed, return ())
NT.closeEndPoint (localEndPoint node)
runProcess :: LocalNode -> Process () -> IO ()
runProcess node proc = do
done <- newEmptyMVar
void $ forkProcess node $ try proc >>= liftIO . putMVar done
takeMVar done >>= either (throwIO :: SomeException -> IO a) return
forkProcess :: LocalNode -> Process () -> IO ProcessId
forkProcess node proc =
modifyMVarMasked (localState node) startProcess
where
startProcess :: LocalNodeState -> IO (LocalNodeState, ProcessId)
startProcess (LocalNodeValid vst) = do
let lpid = LocalProcessId { lpidCounter = vst ^. localPidCounter
, lpidUnique = vst ^. localPidUnique
}
let pid = ProcessId { processNodeId = localNodeId node
, processLocalId = lpid
}
pst <- newMVar LocalProcessState { _monitorCounter = 0
, _spawnCounter = 0
, _channelCounter = 0
, _typedChannels = Map.empty
}
queue <- newCQueue
weakQueue <- mkWeakCQueue queue (return ())
(_, lproc) <- fixIO $ \ ~(tid, _) -> do
let lproc = LocalProcess { processQueue = queue
, processWeakQ = weakQueue
, processId = pid
, processState = pst
, processThread = tid
, processNode = node
}
tid' <- uninterruptibleMask_ $ forkIOWithUnmask $ \unmask -> do
reason <- Exception.catches
(unmask $ runLocalProcess lproc proc >> return DiedNormal)
[ (Exception.Handler (\ex@(ProcessExitException from msg) -> do
mMsg <- unwrapMessage msg :: IO (Maybe String)
case mMsg of
Nothing -> return $ DiedException $ show ex
Just m -> return $ DiedException ("exit-from=" ++ (show from) ++ ",reason=" ++ m)))
, (Exception.Handler
(return . DiedException . (show :: SomeException -> String)))]
mconns <- modifyValidLocalState node (cleanupProcess pid)
forM_ mconns $ forkIO . mapM_ NT.close
writeChan (localCtrlChan node) NCMsg
{ ctrlMsgSender = ProcessIdentifier pid
, ctrlMsgSignal = Died (ProcessIdentifier pid) reason
}
return (tid', lproc)
trace node (MxSpawned pid)
if lpidCounter lpid == maxBound
then do
newUnique <- randomIO
return ( LocalNodeValid
$ (localProcessWithId lpid ^= Just lproc)
. (localPidCounter ^= firstNonReservedProcessId)
. (localPidUnique ^= newUnique)
$ vst
, pid
)
else
return ( LocalNodeValid
$ (localProcessWithId lpid ^= Just lproc)
. (localPidCounter ^: (+ 1))
$ vst
, pid
)
startProcess LocalNodeClosed =
throwIO $ NodeClosedException $ localNodeId node
cleanupProcess :: ProcessId
-> ValidLocalNodeState
-> IO (ValidLocalNodeState, [NT.Connection])
cleanupProcess pid vst = do
let pid' = ProcessIdentifier pid
let (affected, unaffected) = Map.partitionWithKey (\(fr, _to) !_v -> impliesDeathOf pid' fr) (vst ^. localConnections)
return ( (localProcessWithId (processLocalId pid) ^= Nothing)
. (localConnections ^= unaffected)
$ vst
, map fst $ Map.elems affected
)
type IncomingConnection = (NT.EndPointAddress, IncomingTarget)
data IncomingTarget =
Uninit
| ToProc ProcessId (Weak (CQueue Message))
| ToChan TypedChannel
| ToNode
data ConnectionState = ConnectionState {
_incoming :: !(Map NT.ConnectionId IncomingConnection)
, _incomingFrom :: !(Map NT.EndPointAddress (Set NT.ConnectionId))
}
initConnectionState :: ConnectionState
initConnectionState = ConnectionState {
_incoming = Map.empty
, _incomingFrom = Map.empty
}
incoming :: Accessor ConnectionState (Map NT.ConnectionId IncomingConnection)
incoming = accessor _incoming (\conns st -> st { _incoming = conns })
incomingAt :: NT.ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt cid = incoming >>> DAC.mapMaybe cid
incomingFrom :: NT.EndPointAddress -> Accessor ConnectionState (Set NT.ConnectionId)
incomingFrom addr = aux >>> DAC.mapDefault Set.empty addr
where
aux = accessor _incomingFrom (\fr st -> st { _incomingFrom = fr })
handleIncomingMessages :: LocalNode -> IO ()
handleIncomingMessages node = go initConnectionState
`Exception.catch` \(NodeClosedException _) -> return ()
where
go :: ConnectionState -> IO ()
go !st = do
event <- NT.receive endpoint
case event of
NT.ConnectionOpened cid rel theirAddr ->
if rel == NT.ReliableOrdered
then
trace node (MxConnected cid theirAddr)
>> go (
(incomingAt cid ^= Just (theirAddr, Uninit))
. (incomingFrom theirAddr ^: Set.insert cid)
$ st
)
else invalidRequest cid st
NT.Received cid payload ->
case st ^. incomingAt cid of
Just (_, ToProc pid weakQueue) -> do
mQueue <- deRefWeak weakQueue
forM_ mQueue $ \queue -> do
let msg = payloadToMessage payload
enqueue queue msg
trace node (MxReceived pid msg)
go st
Just (_, ToChan (TypedChannel chan')) -> do
mChan <- deRefWeak chan'
forM_ mChan $ \chan -> atomically $
writeTQueue chan $! decode (BSL.fromChunks payload)
go st
Just (_, ToNode) -> do
let ctrlMsg = decode . BSL.fromChunks $ payload
writeChan ctrlChan $! ctrlMsg
go st
Just (src, Uninit) ->
case decode (BSL.fromChunks payload) of
ProcessIdentifier pid -> do
let lpid = processLocalId pid
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
case mProc of
Just proc ->
go (incomingAt cid ^= Just (src, ToProc pid (processWeakQ proc)) $ st)
Nothing ->
invalidRequest cid st
SendPortIdentifier chId -> do
let lcid = sendPortLocalId chId
lpid = processLocalId (sendPortProcessId chId)
mProc <- withValidLocalState node $ return . (^. localProcessWithId lpid)
case mProc of
Just proc -> do
mChannel <- withMVar (processState proc) $ return . (^. typedChannelWithId lcid)
case mChannel of
Just channel ->
go (incomingAt cid ^= Just (src, ToChan channel) $ st)
Nothing ->
invalidRequest cid st
Nothing ->
invalidRequest cid st
NodeIdentifier nid ->
if nid == localNodeId node
then go (incomingAt cid ^= Just (src, ToNode) $ st)
else invalidRequest cid st
Nothing ->
invalidRequest cid st
NT.ConnectionClosed cid ->
case st ^. incomingAt cid of
Nothing ->
invalidRequest cid st
Just (src, _) -> do
trace node (MxDisconnected cid src)
go ( (incomingAt cid ^= Nothing)
. (incomingFrom src ^: Set.delete cid)
$ st
)
NT.ErrorEvent (NT.TransportError (NT.EventConnectionLost theirAddr) _) -> do
let nid = NodeIdentifier $ NodeId theirAddr
writeChan ctrlChan NCMsg
{ ctrlMsgSender = nid
, ctrlMsgSignal = Died nid DiedDisconnect
}
let notLost k = not (k `Set.member` (st ^. incomingFrom theirAddr))
closeImplicitReconnections node nid
go ( (incomingFrom theirAddr ^= Set.empty)
. (incoming ^: Map.filterWithKey (const . notLost))
$ st
)
NT.ErrorEvent (NT.TransportError NT.EventEndPointFailed str) ->
fail $ "Cloud Haskell fatal error: end point failed: " ++ str
NT.ErrorEvent (NT.TransportError NT.EventTransportFailed str) ->
fail $ "Cloud Haskell fatal error: transport failed: " ++ str
NT.EndPointClosed ->
return ()
NT.ReceivedMulticast _ _ ->
fail "Cloud Haskell fatal error: received unexpected multicast"
invalidRequest :: NT.ConnectionId -> ConnectionState -> IO ()
invalidRequest cid st = do
traceEventFmtIO node "" [(TraceStr " [network] invalid request: "),
(Trace cid)]
go ( incomingAt cid ^= Nothing
$ st
)
endpoint = localEndPoint node
ctrlChan = localCtrlChan node
runNodeController :: LocalNode -> IO ()
runNodeController node =
runReaderT (evalStateT (unNC nodeController) initNCState) node
`Exception.catch` \(NodeClosedException _) -> return ()
data NCState = NCState
{
_links :: !(Map Identifier (Set ProcessId))
, _monitors :: !(Map Identifier (Set (ProcessId, MonitorRef)))
, _registeredHere :: !(Map String ProcessId)
, _registeredOnNodes :: !(Map ProcessId [(NodeId,Int)])
}
newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a }
deriving ( Applicative
, Functor
, Monad
, MonadIO
, MonadState NCState
, MonadReader LocalNode
)
initNCState :: NCState
initNCState = NCState { _links = Map.empty
, _monitors = Map.empty
, _registeredHere = Map.empty
, _registeredOnNodes = Map.empty
}
data ProcessKillException =
ProcessKillException !ProcessId !String
deriving (Typeable)
instance Exception ProcessKillException
instance Show ProcessKillException where
show (ProcessKillException pid reason) =
"killed-by=" ++ show pid ++ ",reason=" ++ reason
ncSendToProcess :: ProcessId -> Message -> NC ()
ncSendToProcess = ncSendToProcessAndTrace True
ncSendToProcessAndTrace :: Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace shouldTrace pid msg = do
node <- ask
if processNodeId pid == localNodeId node
then ncEffectLocalSendAndTrace shouldTrace node pid msg
else liftIO $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ processNodeId pid)
WithImplicitReconnect
NCMsg { ctrlMsgSender = NodeIdentifier $ localNodeId node
, ctrlMsgSignal = UnreliableSend (processLocalId pid) msg
}
ncSendToNode :: NodeId -> NCMsg -> NC ()
ncSendToNode to msg = do
node <- ask
liftIO $ if to == localNodeId node
then writeChan (localCtrlChan node) $! msg
else sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier to)
WithImplicitReconnect
msg
traceNotifyDied :: LocalNode -> Identifier -> DiedReason -> NC ()
traceNotifyDied node ident reason =
liftIO $ withLocalTracer node $ \t ->
case ident of
(NodeIdentifier nid) -> traceEvent t (MxNodeDied nid reason)
(ProcessIdentifier pid) -> traceEvent t (MxProcessDied pid reason)
_ -> return ()
traceEventFmtIO :: LocalNode
-> String
-> [TraceArg]
-> IO ()
traceEventFmtIO node fmt args =
withLocalTracer node $ \t -> traceLogFmt t fmt args
trace :: LocalNode -> MxEvent -> IO ()
trace node ev = withLocalTracer node $ \t -> traceEvent t ev
withLocalTracer :: LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer node act = act (localEventBus node)
nodeController :: NC ()
nodeController = do
node <- ask
forever' $ do
msg <- liftIO $ readChan (localCtrlChan node)
case destNid (ctrlMsgSignal msg) of
Just nid' | nid' /= localNodeId node ->
ncSendToNode nid' msg
_ ->
return ()
case msg of
NCMsg (ProcessIdentifier from) (Link them) ->
ncEffectMonitor from them Nothing
NCMsg (ProcessIdentifier from) (Monitor ref) ->
ncEffectMonitor from (monitorRefIdent ref) (Just ref)
NCMsg (ProcessIdentifier from) (Unlink them) ->
ncEffectUnlink from them
NCMsg (ProcessIdentifier from) (Unmonitor ref) ->
ncEffectUnmonitor from ref
NCMsg _from (Died ident reason) ->
ncEffectDied ident reason
NCMsg (ProcessIdentifier from) (Spawn proc ref) ->
ncEffectSpawn from proc ref
NCMsg (ProcessIdentifier from) (Register label atnode pid force) ->
ncEffectRegister from label atnode pid force
NCMsg (ProcessIdentifier from) (WhereIs label) ->
ncEffectWhereIs from label
NCMsg _ (NamedSend label msg') ->
ncEffectNamedSend label msg'
NCMsg _ (UnreliableSend lpid msg') ->
ncEffectLocalSend node (ProcessId (localNodeId node) lpid) msg'
NCMsg _ (LocalSend to msg') ->
ncEffectLocalSend node to msg'
NCMsg _ (LocalPortSend to msg') ->
ncEffectLocalPortSend to msg'
NCMsg (ProcessIdentifier from) (Kill to reason) ->
ncEffectKill from to reason
NCMsg (ProcessIdentifier from) (Exit to reason) ->
ncEffectExit from to reason
NCMsg (ProcessIdentifier from) (GetInfo pid) ->
ncEffectGetInfo from pid
NCMsg _ SigShutdown ->
liftIO $ do
NT.closeEndPoint (localEndPoint node)
`Exception.finally` throwIO (NodeClosedException $ localNodeId node)
NCMsg (ProcessIdentifier from) (GetNodeStats nid) ->
ncEffectGetNodeStats from nid
unexpected ->
error $ "nodeController: unexpected message " ++ show unexpected
ncEffectMonitor :: ProcessId
-> Identifier
-> Maybe MonitorRef
-> NC ()
ncEffectMonitor from them mRef = do
node <- ask
shouldLink <-
if not (isLocal node them)
then return True
else isValidLocalIdentifier them
case (shouldLink, isLocal node (ProcessIdentifier from)) of
(True, _) ->
case mRef of
Just ref -> modify' $ monitorsFor them ^: Set.insert (from, ref)
Nothing -> modify' $ linksFor them ^: Set.insert from
(False, True) ->
notifyDied from them DiedUnknownId mRef
(False, False) ->
ncSendToNode (processNodeId from) $ NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died them DiedUnknownId
}
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
ncEffectUnlink from them = do
node <- ask
when (isLocal node (ProcessIdentifier from)) $
case them of
ProcessIdentifier pid ->
postAsMessage from $ DidUnlinkProcess pid
NodeIdentifier nid ->
postAsMessage from $ DidUnlinkNode nid
SendPortIdentifier cid ->
postAsMessage from $ DidUnlinkPort cid
modify' $ linksFor them ^: Set.delete from
ncEffectUnmonitor :: ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor from ref = do
node <- ask
when (isLocal node (ProcessIdentifier from)) $
postAsMessage from $ DidUnmonitor ref
modify' $ monitorsFor (monitorRefIdent ref) ^: Set.delete (from, ref)
ncEffectDied :: Identifier -> DiedReason -> NC ()
ncEffectDied ident reason = do
node <- ask
traceNotifyDied node ident reason
(affectedLinks, unaffectedLinks) <- gets (splitNotif ident . (^. links))
(affectedMons, unaffectedMons) <- gets (splitNotif ident . (^. monitors))
let localOnly = case ident of NodeIdentifier _ -> True ; _ -> False
forM_ (Map.toList affectedLinks) $ \(them, uss) ->
forM_ uss $ \us ->
when (localOnly <= isLocal node (ProcessIdentifier us)) $
notifyDied us them reason Nothing
forM_ (Map.toList affectedMons) $ \(them, refs) ->
forM_ refs $ \(us, ref) ->
when (localOnly <= isLocal node (ProcessIdentifier us)) $
notifyDied us them reason (Just ref)
modify' $ (links ^= unaffectedLinks) . (monitors ^= unaffectedMons)
modify' $ registeredHere ^: Map.filter (\pid -> not $ ident `impliesDeathOf` ProcessIdentifier pid)
remaining <- fmap Map.toList (gets (^. registeredOnNodes)) >>=
mapM (\(pid,nidlist) ->
case ident `impliesDeathOf` ProcessIdentifier pid of
True ->
do forM_ nidlist $ \(nid,_) ->
when (not $ isLocal node (NodeIdentifier nid))
(forwardNameDeath node nid)
return Nothing
False -> return $ Just (pid,nidlist) )
modify' $ registeredOnNodes ^= (Map.fromList (catMaybes remaining))
where
forwardNameDeath node nid = ncSendToNode nid
NCMsg { ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died ident reason
}
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn pid cProc ref = do
mProc <- unClosure cProc
let proc = case mProc of
Left err -> fail $ "Error: Could not resolve closure: " ++ err
Right p -> p
node <- ask
pid' <- liftIO $ forkProcess node proc
ncSendToProcess pid $ unsafeCreateUnencodedMessage $ DidSpawn ref pid'
ncEffectRegister :: ProcessId -> String -> NodeId -> Maybe ProcessId -> Bool -> NC ()
ncEffectRegister from label atnode mPid reregistration = do
node <- ask
currentVal <- gets (^. registeredHereFor label)
isOk <-
case mPid of
Nothing ->
return $ isJust currentVal
Just thepid ->
do isvalidlocal <- isValidLocalIdentifier (ProcessIdentifier thepid)
return $ (isNothing currentVal /= reregistration) &&
(not (isLocal node (ProcessIdentifier thepid) ) || isvalidlocal )
if isLocal node (NodeIdentifier atnode)
then do when isOk $
do modify' $ registeredHereFor label ^= mPid
updateRemote node currentVal mPid
case mPid of
(Just p) -> liftIO $ trace node (MxRegistered p label)
Nothing -> liftIO $ trace node (MxUnRegistered (fromJust currentVal) label)
newVal <- gets (^. registeredHereFor label)
ncSendToProcess from $ unsafeCreateUnencodedMessage $
RegisterReply label isOk newVal
else let operation =
case reregistration of
True -> flip decList
False -> flip incList
in case mPid of
Nothing -> return ()
Just pid -> modify' $ registeredOnNodesFor pid ^: (maybeify $ operation atnode)
where updateRemote node (Just oldval) (Just newval) | processNodeId oldval /= processNodeId newval =
do forward node (processNodeId oldval) (Register label atnode (Just oldval) True)
forward node (processNodeId newval) (Register label atnode (Just newval) False)
updateRemote node Nothing (Just newval) =
forward node (processNodeId newval) (Register label atnode (Just newval) False)
updateRemote node (Just oldval) Nothing =
forward node (processNodeId oldval) (Register label atnode (Just oldval) True)
updateRemote _ _ _ = return ()
maybeify f Nothing = unmaybeify $ f []
maybeify f (Just x) = unmaybeify $ f x
unmaybeify [] = Nothing
unmaybeify x = Just x
incList [] tag = [(tag,1)]
incList ((atag,acount):xs) tag | tag==atag = (atag,acount+1) : xs
incList (x:xs) tag = x : incList xs tag
decList [] _ = []
decList ((atag,1):xs) tag | atag == tag = xs
decList ((atag,n):xs) tag | atag == tag = (atag,n1):xs
decList (x:xs) tag = x:decList xs tag
forward node to reg =
when (not $ isLocal node (NodeIdentifier to)) $
ncSendToNode to $ NCMsg { ctrlMsgSender = ProcessIdentifier from
, ctrlMsgSignal = reg
}
ncEffectWhereIs :: ProcessId -> String -> NC ()
ncEffectWhereIs from label = do
mPid <- gets (^. registeredHereFor label)
ncSendToProcess from $ unsafeCreateUnencodedMessage $ WhereIsReply label mPid
ncEffectNamedSend :: String -> Message -> NC ()
ncEffectNamedSend label msg = do
mPid <- gets (^. registeredHereFor label)
forM_ mPid $ \to ->
ncSendToProcessAndTrace (label /= "trace.logger") to msg
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend = ncEffectLocalSendAndTrace True
ncEffectLocalSendAndTrace :: Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace shouldTrace node to msg =
liftIO $ withLocalProc node to $ \p -> do
enqueue (processQueue p) msg
when shouldTrace $ trace node (MxReceived to msg)
ncEffectLocalPortSend :: SendPortId -> Message -> NC ()
ncEffectLocalPortSend from msg = do
node <- ask
let pid = sendPortProcessId from
cid = sendPortLocalId from
liftIO $ withLocalProc node pid $ \proc -> do
mChan <- withMVar (processState proc) $ return . (^. typedChannelWithId cid)
case mChan of
Nothing -> return ()
Just (TypedChannel chan') -> do
ch <- deRefWeak chan'
forM_ ch $ \chan -> deliverChan msg chan
where deliverChan :: forall a . Message -> TQueue a -> IO ()
deliverChan (UnencodedMessage _ raw) chan' =
atomically $ writeTQueue chan' ((unsafeCoerce raw) :: a)
deliverChan (EncodedMessage _ _) _ =
error "invalid local channel delivery"
ncEffectKill :: ProcessId -> ProcessId -> String -> NC ()
ncEffectKill from to reason = do
node <- ask
when (isLocal node (ProcessIdentifier to)) $
throwException to $ ProcessKillException from reason
ncEffectExit :: ProcessId -> ProcessId -> Message -> NC ()
ncEffectExit from to reason = do
node <- ask
when (isLocal node (ProcessIdentifier to)) $
throwException to $ ProcessExitException from reason
ncEffectGetInfo :: ProcessId -> ProcessId -> NC ()
ncEffectGetInfo from pid =
let lpid = processLocalId pid
them = (ProcessIdentifier pid)
in do
node <- ask
mProc <- liftIO $ withValidLocalState node
$ return . (^. localProcessWithId lpid)
case mProc of
Nothing -> dispatch (isLocal node (ProcessIdentifier from))
from (ProcessInfoNone DiedUnknownId)
Just proc -> do
itsLinks <- gets (^. linksFor them)
itsMons <- gets (^. monitorsFor them)
registered <- gets (^. registeredHere)
size <- liftIO $ queueSize $ processQueue $ proc
let reg = registeredNames registered
dispatch (isLocal node (ProcessIdentifier from))
from
ProcessInfo {
infoNode = (processNodeId pid)
, infoRegisteredNames = reg
, infoMessageQueueLength = size
, infoMonitors = Set.toList itsMons
, infoLinks = Set.toList itsLinks
}
where dispatch :: (Serializable a, Show a)
=> Bool
-> ProcessId
-> a
-> NC ()
dispatch True dest pInfo = postAsMessage dest $ pInfo
dispatch False dest pInfo =
ncSendToProcess dest $ unsafeCreateUnencodedMessage pInfo
registeredNames = Map.foldlWithKey (\ks k v -> if v == pid
then (k:ks)
else ks) []
ncEffectGetNodeStats :: ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats from _nid = do
node <- ask
ncState <- StateT.get
nodeState <- liftIO $ withValidLocalState node return
let stats =
NodeStats {
nodeStatsNode = localNodeId node
, nodeStatsRegisteredNames = Map.size $ ncState ^. registeredHere
, nodeStatsMonitors = Map.size $ ncState ^. monitors
, nodeStatsLinks = Map.size $ ncState ^. links
, nodeStatsProcesses = Map.size (nodeState ^. localProcesses)
}
postAsMessage from stats
notifyDied :: ProcessId
-> Identifier
-> DiedReason
-> Maybe MonitorRef
-> NC ()
notifyDied dest src reason mRef = do
node <- ask
case (isLocal node (ProcessIdentifier dest), mRef, src) of
(True, Just ref, ProcessIdentifier pid) ->
postAsMessage dest $ ProcessMonitorNotification ref pid reason
(True, Just ref, NodeIdentifier nid) ->
postAsMessage dest $ NodeMonitorNotification ref nid reason
(True, Just ref, SendPortIdentifier cid) ->
postAsMessage dest $ PortMonitorNotification ref cid reason
(True, Nothing, ProcessIdentifier pid) ->
throwException dest $ ProcessLinkException pid reason
(True, Nothing, NodeIdentifier pid) ->
throwException dest $ NodeLinkException pid reason
(True, Nothing, SendPortIdentifier pid) ->
throwException dest $ PortLinkException pid reason
(False, _, _) ->
ncSendToNode (processNodeId dest) $ NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died src reason
}
destNid :: ProcessSignal -> Maybe NodeId
destNid (Link ident) = Just $ nodeOf ident
destNid (Unlink ident) = Just $ nodeOf ident
destNid (Monitor ref) = Just $ nodeOf (monitorRefIdent ref)
destNid (Unmonitor ref) = Just $ nodeOf (monitorRefIdent ref)
destNid (Spawn _ _) = Nothing
destNid (Register _ _ _ _) = Nothing
destNid (WhereIs _) = Nothing
destNid (NamedSend _ _) = Nothing
destNid (UnreliableSend _ _) = Nothing
destNid (Died _ _) = Nothing
destNid (Kill pid _) = Just $ processNodeId pid
destNid (Exit pid _) = Just $ processNodeId pid
destNid (GetInfo pid) = Just $ processNodeId pid
destNid (GetNodeStats nid) = Just nid
destNid (LocalSend pid _) = Just $ processNodeId pid
destNid (LocalPortSend cid _) = Just $ processNodeId (sendPortProcessId cid)
destNid (SigShutdown) = Nothing
isLocal :: LocalNode -> Identifier -> Bool
isLocal nid ident = nodeOf ident == localNodeId nid
unClosure :: Typeable a => Closure a -> NC (Either String a)
unClosure closure = do
rtable <- remoteTable <$> ask
return (Static.unclosure rtable closure)
isValidLocalIdentifier :: Identifier -> NC Bool
isValidLocalIdentifier ident = do
node <- ask
liftIO . withValidLocalState node $ \nSt ->
case ident of
NodeIdentifier nid ->
return $ nid == localNodeId node
ProcessIdentifier pid -> do
let mProc = nSt ^. localProcessWithId (processLocalId pid)
return $ isJust mProc
SendPortIdentifier cid -> do
let pid = sendPortProcessId cid
mProc = nSt ^. localProcessWithId (processLocalId pid)
case mProc of
Nothing -> return False
Just proc -> withMVar (processState proc) $ \pSt -> do
let mCh = pSt ^. typedChannelWithId (sendPortLocalId cid)
return $ isJust mCh
postAsMessage :: Serializable a => ProcessId -> a -> NC ()
postAsMessage pid = postMessage pid . createUnencodedMessage
postMessage :: ProcessId -> Message -> NC ()
postMessage pid msg = do
node <- ask
liftIO $ withLocalProc node pid $ \p -> enqueue (processQueue p) msg
throwException :: Exception e => ProcessId -> e -> NC ()
throwException pid e = do
node <- ask
liftIO $ withLocalProc node pid $ \p -> void $ forkIO $ throwTo (processThread p) e
withLocalProc :: LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc node pid p =
let lpid = processLocalId pid in do
withValidLocalState node $ \vst ->
forM_ (vst ^. localProcessWithId lpid) p
links :: Accessor NCState (Map Identifier (Set ProcessId))
links = accessor _links (\ls st -> st { _links = ls })
monitors :: Accessor NCState (Map Identifier (Set (ProcessId, MonitorRef)))
monitors = accessor _monitors (\ms st -> st { _monitors = ms })
registeredHere :: Accessor NCState (Map String ProcessId)
registeredHere = accessor _registeredHere (\ry st -> st { _registeredHere = ry })
registeredOnNodes :: Accessor NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes = accessor _registeredOnNodes (\ry st -> st { _registeredOnNodes = ry })
linksFor :: Identifier -> Accessor NCState (Set ProcessId)
linksFor ident = links >>> DAC.mapDefault Set.empty ident
monitorsFor :: Identifier -> Accessor NCState (Set (ProcessId, MonitorRef))
monitorsFor ident = monitors >>> DAC.mapDefault Set.empty ident
registeredHereFor :: String -> Accessor NCState (Maybe ProcessId)
registeredHereFor ident = registeredHere >>> DAC.mapMaybe ident
registeredOnNodesFor :: ProcessId -> Accessor NCState (Maybe [(NodeId,Int)])
registeredOnNodesFor ident = registeredOnNodes >>> DAC.mapMaybe ident
splitNotif :: Identifier
-> Map Identifier a
-> (Map Identifier a, Map Identifier a)
splitNotif ident = Map.partitionWithKey (\k !_v -> ident `impliesDeathOf` k)
modify' :: MonadState s m => (s -> s) -> m ()
modify' f = StateT.get >>= \s -> StateT.put $! f s