{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
module Control.Distributed.Process.Extras.Internal.Primitives
(
Addressable
, Routable(..)
, Resolvable(..)
, Linkable(..)
, Killable(..)
, Monitored(..)
, spawnSignalled
, spawnLinkLocal
, spawnMonitorLocal
, linkOnFailure
, whereisRemote
, whereisOrStart
, whereisOrStartRemote
, matchCond
, awaitResponse
, times
, monitor
, awaitExit
, isProcessAlive
, forever'
, deliver
, __remoteTable
) where
import Control.Concurrent (myThreadId, throwTo)
import Control.Distributed.Process hiding (monitor, finally, catch)
import qualified Control.Distributed.Process as P (monitor, unmonitor)
import Control.Distributed.Process.Closure (seqCP, remotable, mkClosure)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Extras.Internal.Types
( Addressable
, Linkable(..)
, Killable(..)
, Resolvable(..)
, Routable(..)
, Monitored(..)
, RegisterSelf(..)
, ExitReason(ExitOther)
, whereisRemote
)
import Control.Monad (void, (>=>), replicateM_)
import Control.Monad.Catch (finally, catchIf)
import Data.Maybe (isJust, fromJust)
import Data.Foldable (traverse_)
monitor :: Resolvable a => a -> Process (Maybe MonitorRef)
monitor = resolve >=> traverse P.monitor
awaitExit :: Resolvable a => a -> Process ()
awaitExit = resolve >=> traverse_ await where
await pid = withMonitorRef pid $ \ref -> receiveWait
[ matchIf (\(ProcessMonitorNotification r _ _) -> r == ref)
(\_ -> return ())
]
withMonitorRef pid = bracket (P.monitor pid) P.unmonitor
deliver :: (Addressable a, Serializable m) => m -> a -> Process ()
deliver = flip sendTo
isProcessAlive :: ProcessId -> Process Bool
isProcessAlive pid = isJust <$> getProcessInfo pid
times :: Int -> Process () -> Process ()
times = replicateM_
{-# DEPRECATED times "use replicateM_ instead" #-}
forever' :: Monad m => m a -> m b
forever' a = let a' = a >> a' in a'
{-# INLINE forever' #-}
spawnSignalled :: Process a -> (a -> Process ()) -> Process ProcessId
spawnSignalled before after = do
(sigStart, recvStart) <- newChan
(pid, mRef) <- spawnMonitorLocal $ do
initProc <- before
sendChan sigStart ()
after initProc
receiveWait [
matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ dr) -> die $ ExitOther (show dr))
, matchChan recvStart (\() -> return pid)
] `finally` (unmonitor mRef)
spawnLinkLocal :: Process () -> Process ProcessId
spawnLinkLocal p = do
pid <- spawnLocal p
link pid
return pid
spawnMonitorLocal :: Process () -> Process (ProcessId, MonitorRef)
spawnMonitorLocal p = do
pid <- spawnLocal p
ref <- P.monitor pid
return (pid, ref)
linkOnFailure :: ProcessId -> Process ()
linkOnFailure them = do
us <- getSelfPid
tid <- liftIO $ myThreadId
void $ spawnLocal $ do
callerRef <- P.monitor us
calleeRef <- P.monitor them
reason <- receiveWait [
matchIf (\(ProcessMonitorNotification mRef _ _) ->
mRef == callerRef)
(\_ -> return DiedNormal)
, matchIf (\(ProcessMonitorNotification mRef' _ _) ->
mRef' == calleeRef)
(\(ProcessMonitorNotification _ _ r') -> return r')
]
case reason of
DiedNormal -> return ()
_ -> liftIO $ throwTo tid (ProcessLinkException us reason)
whereisOrStart :: String -> Process () -> Process ProcessId
whereisOrStart name proc = do
(sigStart, recvStart) <- newChan
(_, mRef) <- spawnMonitorLocal $ do
us <- getSelfPid
catchIf (\(ProcessRegistrationException _ r) -> isJust r)
(register name us >> sendChan sigStart us)
(\(ProcessRegistrationException _ rPid) ->
sendChan sigStart $ fromJust rPid)
proc
receiveWait [
matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ dr) -> die $ ExitOther (show dr))
, matchChan recvStart return
] `finally` (unmonitor mRef)
registerSelf :: (String, ProcessId) -> Process ()
registerSelf (name,target) =
do self <- getSelfPid
register name self
send target (RegisterSelf, self)
() <- expect
return ()
$(remotable ['registerSelf])
whereisOrStartRemote :: NodeId -> String -> Closure (Process ()) -> Process (Maybe ProcessId)
whereisOrStartRemote nid name proc =
do mRef <- monitorNode nid
whereisRemoteAsync nid name
res <- receiveWait
[ matchIf (\(WhereIsReply label _) -> label == name)
(\(WhereIsReply _ mPid) -> return (Just mPid)),
matchIf (\(NodeMonitorNotification aref _ _) -> aref == mRef)
(\(NodeMonitorNotification _ _ _) -> return Nothing)
]
case res of
Nothing -> return Nothing
Just (Just pid) -> unmonitor mRef >> return (Just pid)
Just Nothing ->
do self <- getSelfPid
sRef <- spawnAsync nid ($(mkClosure 'registerSelf) (name,self) `seqCP` proc)
ret <- receiveWait [
matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef)
(\(NodeMonitorNotification _ _ _) -> return Nothing),
matchIf (\(DidSpawn ref _) -> ref==sRef )
(\(DidSpawn _ pid) ->
do pRef <- P.monitor pid
receiveWait
[ matchIf (\(RegisterSelf, apid) -> apid == pid)
(\(RegisterSelf, _) -> do unmonitor pRef
send pid ()
return $ Just pid),
matchIf (\(NodeMonitorNotification aref _ _) -> aref == mRef)
(\(NodeMonitorNotification _aref _ _) -> return Nothing),
matchIf (\(ProcessMonitorNotification ref _ _) -> ref==pRef)
(\(ProcessMonitorNotification _ _ _) -> return Nothing)
] )
]
unmonitor mRef
case ret of
Nothing -> whereisOrStartRemote nid name proc
Just pid -> return $ Just pid
matchCond :: (Serializable a) => (a -> Maybe (Process b)) -> Match b
matchCond cond =
let v n = (isJust n, fromJust n)
res = v . cond
in matchIf (fst . res) (snd . res)
awaitResponse :: Addressable a
=> a
-> [Match (Either ExitReason b)]
-> Process (Either ExitReason b)
awaitResponse addr matches = do
mPid <- resolve addr
case mPid of
Nothing -> return $ Left $ ExitOther "UnresolvedAddress"
Just p ->
bracket (P.monitor p)
P.unmonitor
$ \mRef -> receiveWait ((matchRef mRef):matches)
where
matchRef :: MonitorRef -> Match (Either ExitReason b)
matchRef r = matchIf (\(ProcessMonitorNotification r' _ _) -> r == r')
(\(ProcessMonitorNotification _ _ d) -> do
return (Left (ExitOther (show d))))