distributed-process-0.7.0: Cloud Haskell: Erlang-style concurrency in Haskell

Copyright(c) Well-Typed / Tim Watson
LicenseBSD3 (see the file LICENSE)
MaintainerTim Watson <watson.timothy@gmail.com>
Stabilityexperimental
Portabilitynon-portable (requires concurrency)
Safe HaskellNone
LanguageHaskell98

Control.Distributed.Process.Management

Contents

Description

Management Extensions API

This module presents an API for creating Management Agents: special processes that are capable of receiving and responding to a node's internal system events. These system events are delivered by the management event bus: An internal subsystem maintained for each running node, to which all agents are automatically subscribed.

Agents are defined in terms of event sinks, taking a particular Serializable type and evaluating to an action in the MxAgent monad in response. Each MxSink evaluates to an MxAction that specifies whether the agent should continue processing it's inputs or stop. If the type of a message cannot be matched to any of the agent's sinks, it will be discarded. A sink can also deliberately skip processing a message, deferring to the remaining handlers. This is the only way that more than one event sink can handle the same data type, since otherwise the first type match will win every time a message arrives. See mxSkip for details.

Various events are published to the management event bus automatically, the full list of which can be found in the definition of the MxEvent data type. Additionally, clients of the Management API can publish arbitrary Serializable data to the event bus using mxNotify. All running agents receive all events (from the primary event bus to which they're subscribed).

Agent processes are automatically registered on the local node, and can receive messages via their mailbox just like ordinary processes. Unlike ordinary Process code however, it is unnecessary (though possible) for agents to use the base expect and receiveX primitives to do this, since the management infrastructure will continuously read from both the primary event bus and the process' own mailbox. Messages are transparently passed to the agent's event sinks from both sources, so an agent need only concern itself with how to respond to its inputs.

Some agents may wish to prioritise messages from their mailbox over traffic on the management event bus, or vice versa. The mxReceive and mxReceiveChan API calls do this for the mailbox and event bus, respectively. The prioritisation these APIs offer is simply that the chosen data stream will be checked first. No blocking will occur if the chosen (prioritised) source is devoid of input messages, instead the agent handling code will revert to switching between the alternatives in round-robin as usual. If messages exist in one or more channels, they will be consumed as soon as they're available, priority is effectively a hint about which channel to consume from, should messages be available in both.

Prioritisation then, is a hint about the preference of data source from which the next input should be chosen. No guarantee can be made that the chosen source will in fact be selected at runtime.

Management API Semantics

The management API provides no guarantees whatsoever, viz:

  • The ordering of messages delivered to the event bus.
  • The order in which agents will be executed.
  • Whether messages will be taken from the mailbox first, or the event bus.
Management Data API

Both management agents and clients of the API have access to a variety of data storage capabilities, to facilitate publishing and consuming useful system information. Agents maintain their own internal state privately (via a state transformer - see mxGetLocal et al), however it is possible for agents to share additional data with each other (and the outside world) using whatever mechanism the user wishes, e.g., acidstate, or shared memory primitives.

Defining Agents

New agents are defined with mxAgent and require a unique MxAgentId, an initial state - MxAgent runs in a state transformer - and a list of the agent's event sinks. Each MxSink is defined in terms of a specific Serializable type, via the mxSink function, binding the event handler expression to inputs of only that type.

Apart from modifying its own local state, an agent can execute arbitrary Process a code via lifting (see liftMX) and even publish its own messages back to the primary event bus (see mxBroadcast).

Since messages are delivered to agents from both the management event bus and the agent processes mailbox, agents (i.e., event sinks) will generally have no idea as to their origin. An agent can, however, choose to prioritise the choice of input (source) each time one of its event sinks runs. The standard way for an event sink to indicate that the agent is ready for its next input is to evaluate mxReady. When this happens, the management infrastructure will obtain data from the event bus and process' mailbox in a round robbin fashion, i.e., one after the other, changing each time.

Example Code

What follows is a grossly over-simplified example of a management agent that provides a basic name monitoring facility. Whenever a process name is registered or unregistered, clients are informed of the fact.

-- simple notification data type

data Registration = Reg { added  :: Bool
                        , procId :: ProcessId
                        , name   :: String
                        }

-- start a /name monitoring agent/
nameMonitorAgent = do
  mxAgent (MxAgentId "name-monitor") Set.empty [
        (mxSink $ \(pid :: ProcessId) -> do
           mxUpdateState $ Set.insert pid
           mxReady)
      , (mxSink $
            let act =
                  case ev of
                    (MxRegistered   p n) -> notify True  n p
                    (MxUnRegistered p n) -> notify False n p
                    _                    -> return ()
            act >> mxReady)
    ]
  where
    notify a n p = do
      Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal

The client interface (for sending their pid) can take one of two forms:

monitorNames = getSelfPid >>= nsend "name-monitor"
monitorNames2 = getSelfPid >>= mxNotify

For some real-world examples, see the distributed-process-platform package.

Performance, Stablity and Scalability

Management Agents offer numerous advantages over regular processes: broadcast communication with them can have a lower latency, they offer simplified messgage (i.e., input type) handling and they have access to internal system information that would be otherwise unobtainable.

Do not be tempted to implement everything (e.g., the kitchen sink) using the management API though. There are overheads associated with management agents which is why they're presented as tools for consuming low level system information, instead of as application level development tools.

Agents that rely heavily on a busy mailbox can cause the management event bus to backlog un-GC'ed data, leading to increased heap space. Producers that do not take care to avoid passing unevaluated thunks to the API can crash all the agents in the system. Agents are not monitored or managed in any way, and those that crash will not be restarted.

The management event bus can receive a great deal of traffic. Every time a message is sent and/or received, an event is passed to the agent controller and broadcast to all agents (plus the trace controller, if tracing is enabled for the node). This is already a significant overhead - though profiling and benchmarks have demonstrated that it does not adversely affect performance if few agents are installed. Agents will typically use more cycles than plain processes, since they perform additional work: selecting input data from both the event bus and their own mailboxes, plus searching through the set of event sinks (for each agent) to determine the right handler for the event.

Architecture Overview

The architecture of the management event bus is internal and subject to change without prior notice. The description that follows is provided for informational purposes only.

When a node initially starts, two special, internal system processes are started to support the management infrastructure. The first, known as the trace controller, is responsible for consuming MxEvents and forwarding them to the configured tracer - see Control.Distributed.Process.Debug for further details. The second is the management agent controller, and is the primary worker process underpinning the management infrastructure. All published management events are routed to this process, which places them onto a system wide event bus and additionally passes them directly to the trace controller.

There are several reasons for segregating the tracing and management control planes in this fashion. Tracing can be enabled or disabled by clients, whilst the management event bus cannot, since in addition to providing runtime instrumentation, its intended use-cases include node monitoring, peer discovery (via topology providing backends) and other essential system services that require knowledge of otherwise hidden system internals. Tracing is also subject to trace flags that limit the specific MxEvents delivered to trace clients - an overhead/complexity not shared by management agents. Finally, tracing and management agents are implemented using completely different signalling techniques - more on this later - which would introduce considerable complexity if the shared the same event loop.

The management control plane is driven by a shared broadcast channel, which is written to by the agent controller and subscribed to by all agent processes. Agents are spawned as regular processes, whose primary implementation (i.e., server loop) is responsible for consuming messages from both the broadcast channel and their own mailbox. Once consumed, messages are applied to the agent's event sinks until one matches the input, at which point it is applied and the loop continues. The implementation chooses from the event bus and the mailbox in a round-robin fashion, until a message is received. This polling activity would lead to management agents consuming considerable system resources if left unchecked, therefore the implementation will poll for a limitted number of retries, after which it will perform a blocking read on the event bus.

Synopsis

Documentation

data MxEvent Source #

This is the default management event, fired for various internal events around the NT connection and Process lifecycle. All published events that conform to this type, are eligible for tracing - i.e., they will be delivered to the trace controller.

Constructors

MxSpawned ProcessId

fired whenever a local process is spawned

MxRegistered ProcessId String

fired whenever a process/name is registered (locally)

MxUnRegistered ProcessId String

fired whenever a process/name is unregistered (locally)

MxProcessDied ProcessId DiedReason

fired whenever a process dies

MxNodeDied NodeId DiedReason

fired whenever a node dies (i.e., the connection is broken/disconnected)

MxSent ProcessId ProcessId Message

fired whenever a message is sent from a local process

MxReceived ProcessId Message

fired whenever a message is received by a local process

MxConnected ConnectionId EndPointAddress

fired when a network-transport connection is first established

MxDisconnected ConnectionId EndPointAddress

fired when a network-transport connection is broken/disconnected

MxUser Message

a user defined trace event

MxLog String

a logging event - used for debugging purposes only

MxTraceTakeover ProcessId

notifies a trace listener that all subsequent traces will be sent to pid

MxTraceDisable

notifies a trace listener that it has been disabled/removed

Instances

Show MxEvent Source # 
Generic MxEvent Source # 

Associated Types

type Rep MxEvent :: * -> * #

Methods

from :: MxEvent -> Rep MxEvent x #

to :: Rep MxEvent x -> MxEvent #

Binary MxEvent Source # 

Methods

put :: MxEvent -> Put #

get :: Get MxEvent #

putList :: [MxEvent] -> Put #

Addressable MxEvent Source # 
type Rep MxEvent Source # 
type Rep MxEvent = D1 (MetaData "MxEvent" "Control.Distributed.Process.Management.Internal.Types" "distributed-process-0.7.0-C9KhxM4qYtR75UwfW9bShc" False) ((:+:) ((:+:) ((:+:) (C1 (MetaCons "MxSpawned" PrefixI False) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId))) ((:+:) (C1 (MetaCons "MxRegistered" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 String)))) (C1 (MetaCons "MxUnRegistered" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 String)))))) ((:+:) (C1 (MetaCons "MxProcessDied" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 DiedReason)))) ((:+:) (C1 (MetaCons "MxNodeDied" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 NodeId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 DiedReason)))) (C1 (MetaCons "MxSent" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId)) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Message)))))))) ((:+:) ((:+:) (C1 (MetaCons "MxReceived" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Message)))) ((:+:) (C1 (MetaCons "MxConnected" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ConnectionId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 EndPointAddress)))) (C1 (MetaCons "MxDisconnected" PrefixI False) ((:*:) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ConnectionId)) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 EndPointAddress)))))) ((:+:) ((:+:) (C1 (MetaCons "MxUser" PrefixI False) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Message))) (C1 (MetaCons "MxLog" PrefixI False) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 String)))) ((:+:) (C1 (MetaCons "MxTraceTakeover" PrefixI False) (S1 (MetaSel (Nothing Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 ProcessId))) (C1 (MetaCons "MxTraceDisable" PrefixI False) U1)))))

Firing Arbitrary Mx Events

mxNotify :: Serializable a => a -> Process () Source #

Publishes an arbitrary Serializable message to the management event bus. Note that no attempt is made to force the argument, therefore it is very important that you do not pass unevaluated thunks that might crash the receiving process via this API, since all registered agents will gain access to the data structure once it is broadcast by the agent controller.

Constructing Mx Agents

data MxAction Source #

Represents the actions a management agent can take when evaluating an event sink.

newtype MxAgentId Source #

A newtype wrapper for an agent id (which is a string).

Constructors

MxAgentId 

Fields

data MxAgent s a Source #

Monad for management agents.

Instances

Monad (MxAgent s) Source # 

Methods

(>>=) :: MxAgent s a -> (a -> MxAgent s b) -> MxAgent s b #

(>>) :: MxAgent s a -> MxAgent s b -> MxAgent s b #

return :: a -> MxAgent s a #

fail :: String -> MxAgent s a #

Functor (MxAgent s) Source # 

Methods

fmap :: (a -> b) -> MxAgent s a -> MxAgent s b #

(<$) :: a -> MxAgent s b -> MxAgent s a #

Applicative (MxAgent s) Source # 

Methods

pure :: a -> MxAgent s a #

(<*>) :: MxAgent s (a -> b) -> MxAgent s a -> MxAgent s b #

(*>) :: MxAgent s a -> MxAgent s b -> MxAgent s b #

(<*) :: MxAgent s a -> MxAgent s b -> MxAgent s a #

MonadIO (MxAgent s) Source # 

Methods

liftIO :: IO a -> MxAgent s a #

MonadState (MxAgentState s) (MxAgent s) Source # 

Methods

get :: MxAgent s (MxAgentState s) #

put :: MxAgentState s -> MxAgent s () #

state :: (MxAgentState s -> (a, MxAgentState s)) -> MxAgent s a #

mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId Source #

Activates a new agent.

mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId Source #

Activates a new agent. This variant takes a finalizer expression, that is run once the agent shuts down (even in case of failure/exceptions). The finalizer expression runs in the mx monad - MxAgent s () - such that the agent's internal state remains accessible to the shutdown/cleanup code.

type MxSink s = Message -> MxAgent s (Maybe MxAction) Source #

Type of a management agent's event sink.

mxSink :: forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s Source #

Create an MxSink from an expression taking a Serializable type m, that yields an MxAction in the MxAgent monad.

mxGetId :: MxAgent s MxAgentId Source #

Return the MxAgentId for the currently executing agent.

mxDeactivate :: forall s. String -> MxAgent s MxAction Source #

Gracefully terminate an agent.

mxReady :: forall s. MxAgent s MxAction Source #

Continue executing (i.e., receiving and processing messages).

mxSkip :: forall s. MxAgent s MxAction Source #

Causes the currently executing event sink to be skipped. The remaining declared event sinks will be evaluated to find a matching handler. Can be used to allow multiple event sinks to process data of the same type.

mxReceive :: forall s. MxAgent s MxAction Source #

Continue exeucting, prioritising inputs from the process' own mailbox ahead of data from the management event bus.

mxReceiveChan :: forall s. MxAgent s MxAction Source #

Continue exeucting, prioritising inputs from the management event bus over the process' own mailbox.

mxBroadcast :: Serializable m => m -> MxAgent s () Source #

The MxAgent version of mxNotify.

mxSetLocal :: s -> MxAgent s () Source #

Set the agent's local state.

mxGetLocal :: MxAgent s s Source #

Fetch the agent's local state.

mxUpdateLocal :: (s -> s) -> MxAgent s () Source #

Update the agent's local state.

liftMX :: Process a -> MxAgent s a Source #

Lift a Process action.