{-# LANGUAGE CPP #-}
{- | [Cloud Haskell]
This is an implementation of Cloud Haskell, as described in
/Towards Haskell in the Cloud/ by Jeff Epstein, Andrew Black, and Simon
Peyton Jones (see
),
although some of the details are different. The precise message passing
semantics are based on /A unified semantics for future Erlang/ by Hans
Svensson, Lars-Åke Fredlund and Clara Benac Earle.
For a detailed description of the package and other reference materials,
please see the distributed-process wiki page on github:
.
-}
module Control.Distributed.Process
( -- * Basic types
ProcessId
, NodeId(..)
, Process
, SendPortId
, processNodeId
, sendPortProcessId
, liftIO -- Reexported for convenience
-- * Basic messaging
, send
, expect
, expectTimeout
-- * Channels
, ReceivePort
, SendPort
, sendPortId
, newChan
, sendChan
, receiveChan
, receiveChanTimeout
, mergePortsBiased
, mergePortsRR
-- * Unsafe messaging variants
, unsafeSend
, unsafeSendChan
, unsafeNSend
, unsafeWrapMessage
-- * Advanced messaging
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, Message
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, delegate
, relay
, proxy
-- * Process management
, spawn
, call
, terminate
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessTerminationException(..)
, ProcessRegistrationException(..)
, SpawnRef
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
-- * Monitoring and linking
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
, MonitorRef -- opaque
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, DiedReason(..)
-- * Closures
, Closure
, closure
, Static
, unStatic
, unClosure
, RemoteTable
-- * Logging
, say
-- * Registry
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, WhereIsReply(..)
, RegisterReply(..)
-- * Exception handling
, catch
, Handler(..)
, catches
, try
, mask
, onException
, bracket
, bracket_
, finally
-- * Auxiliary API
, spawnAsync
, spawnSupervised
, spawnLink
, spawnMonitor
, spawnChannel
, DidSpawn(..)
-- * Local versions of 'spawn'
, spawnLocal
, spawnChannelLocal
-- * Reconnecting
, reconnect
, reconnectPort
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Control.Monad.IO.Class (liftIO)
import Control.Applicative ((<$>))
import Control.Monad.Reader (ask)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar)
import Control.Distributed.Static
( Closure
, closure
, Static
, RemoteTable
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, Process(..)
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessRegistrationException(..)
, DiedReason(..)
, SpawnRef(..)
, DidSpawn(..)
, SendPort(..)
, ReceivePort(..)
, SendPortId(..)
, WhereIsReply(..)
, RegisterReply(..)
, LocalProcess(processNode)
, Message
)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Primitives
( -- Basic messaging
send
, expect
-- Channels
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeSendChan
, unsafeNSend
-- Advanced messaging
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unsafeWrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, delegate
, relay
, proxy
-- Process management
, terminate
, ProcessTerminationException(..)
, die
, exit
, catchExit
, catchesExit
, kill
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
-- Monitoring and linking
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
-- Logging
, say
-- Registry
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
-- Closures
, unStatic
, unClosure
-- Exception handling
, catch
, Handler(..)
, catches
, try
, mask
, onException
, bracket
, bracket_
, finally
-- Auxiliary API
, expectTimeout
, receiveChanTimeout
, spawnAsync
-- Reconnecting
, reconnect
, reconnectPort
)
import Control.Distributed.Process.Node (forkProcess)
import Control.Distributed.Process.Internal.Spawn
( -- Spawning Processes/Channels
spawn
, spawnLink
, spawnMonitor
, spawnChannel
, spawnSupervised
, call
)
-- INTERNAL NOTES
--
-- 1. 'send' never fails. If you want to know that the remote process received
-- your message, you will need to send an explicit acknowledgement. If you
-- want to know when the remote process failed, you will need to monitor
-- that remote process.
--
-- 2. 'send' may block (when the system TCP buffers are full, while we are
-- trying to establish a connection to the remote endpoint, etc.) but its
-- return does not imply that the remote process received the message (much
-- less processed it). When 'send' targets a local process, it is dispatched
-- via the node controller however, in which cases it will /not/ block. This
-- isn't part of the semantics, so it should be ok. The ordering is maintained
-- because the ctrlChannel still has FIFO semantics with regards interactions
-- between two disparate forkIO threads.
--
-- 3. Message delivery is reliable and ordered. That means that if process A
-- sends messages m1, m2, m3 to process B, B will either arrive all three
-- messages in order (m1, m2, m3) or a prefix thereof; messages will not be
-- 'missing' (m1, m3) or reordered (m1, m3, m2)
--
-- In order to guarantee (3), we stipulate that
--
-- 3a. We do not garbage collect connections because Network.Transport provides
-- ordering guarantees only *per connection*.
--
-- 3b. Once a connection breaks, we have no way of knowing which messages
-- arrived and which did not; hence, once a connection fails, we assume the
-- remote process to be forever unreachable. Otherwise we might sent m1 and
-- m2, get notified of the broken connection, reconnect, send m3, but only
-- m1 and m3 arrive.
--
-- 3c. As a consequence of (3b) we should not reuse PIDs. If a process dies,
-- we consider it forever unreachable. Hence, new processes should get new
-- IDs or they too would be considered unreachable.
--
-- Main reference for Cloud Haskell is
--
-- [1] "Towards Haskell in the Cloud", Jeff Epstein, Andrew Black and Simon
-- Peyton-Jones.
-- http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/remote.pdf
--
-- The precise semantics for message passing is based on
--
-- [2] "A Unified Semantics for Future Erlang", Hans Svensson, Lars-Ake Fredlund
-- and Clara Benac Earle (not freely available online, unfortunately)
--
-- Some pointers to related documentation about Erlang, for comparison and
-- inspiration:
--
-- [3] "Programming Distributed Erlang Applications: Pitfalls and Recipes",
-- Hans Svensson and Lars-Ake Fredlund
-- http://man.lupaworld.com/content/develop/p37-svensson.pdf
-- [4] The Erlang manual, sections "Message Sending" and "Send"
-- http://www.erlang.org/doc/reference_manual/processes.html#id82409
-- http://www.erlang.org/doc/reference_manual/expressions.html#send
-- [5] Questions "Is the order of message reception guaranteed?" and
-- "If I send a message, is it guaranteed to reach the receiver?" of
-- the Erlang FAQ
-- http://www.erlang.org/faq/academic.html
-- [6] "Delivery of Messages", post on erlang-questions
-- http://erlang.org/pipermail/erlang-questions/2012-February/064767.html
--------------------------------------------------------------------------------
-- Local versions of spawn --
--------------------------------------------------------------------------------
-- | Spawn a process on the local node
spawnLocal :: Process () -> Process ProcessId
spawnLocal proc = do
node <- processNode <$> ask
liftIO $ forkProcess node proc
-- | Create a new typed channel, spawn a process on the local node, passing it
-- the receive port, and return the send port
spawnChannelLocal :: Serializable a
=> (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannelLocal proc = do
node <- processNode <$> ask
liftIO $ do
mvar <- newEmptyMVar
_ <- forkProcess node $ do
-- It is important that we allocate the new channel in the new process,
-- because otherwise it will be associated with the wrong process ID
(sport, rport) <- newChan
liftIO $ putMVar mvar sport
proc rport
takeMVar mvar