module Control.Distributed.Process.Extras.Internal.Types
(
Tag
, TagPool
, newTagPool
, getTag
, Linkable(..)
, Killable(..)
, Resolvable(..)
, Routable(..)
, Addressable
, Recipient(..)
, RegisterSelf(..)
, whereisRemote
, resolveOrDie
, CancelWait(..)
, Channel
, Shutdown(..)
, ExitReason(..)
, ServerDisconnected(..)
, NFSerializable
) where
import Control.Concurrent.MVar
( MVar
, newMVar
, modifyMVar
)
import Control.DeepSeq (NFData(..), ($!!))
import Control.Distributed.Process hiding (send, catch)
import qualified Control.Distributed.Process as P
( send
, unsafeSend
, unsafeNSend
)
import Control.Distributed.Process.Serializable
import Control.Exception (SomeException)
import Control.Monad.Catch (catch)
import Data.Binary
import Data.Foldable (traverse_)
import Data.Typeable (Typeable)
import GHC.Generics
class (NFData a, Serializable a) => NFSerializable a
instance (NFData a, Serializable a) => NFSerializable a
instance (NFSerializable a) => NFSerializable (SendPort a)
type Tag = Int
type TagPool = MVar Tag
newTagPool :: Process TagPool
newTagPool = liftIO $ newMVar 0
getTag :: TagPool -> Process Tag
getTag tp = liftIO $ modifyMVar tp (\tag -> return (tag+1,tag))
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
whereisRemote node name = do
mRef <- monitorNode node
whereisRemoteAsync node name
receiveWait [ matchIf (\(NodeMonitorNotification ref nid _) -> ref == mRef &&
nid == node)
(\NodeMonitorNotification{} -> return Nothing)
, matchIf (\(WhereIsReply n _) -> n == name)
(\(WhereIsReply _ mPid) -> return mPid)
]
data CancelWait = CancelWait
deriving (Eq, Show, Typeable, Generic)
instance Binary CancelWait where
instance NFData CancelWait where
type Channel a = (SendPort a, ReceivePort a)
data RegisterSelf = RegisterSelf
deriving (Typeable, Generic)
instance Binary RegisterSelf where
instance NFData RegisterSelf where
data Shutdown = Shutdown
deriving (Typeable, Generic, Show, Eq)
instance Binary Shutdown where
instance NFData Shutdown where
data ExitReason =
ExitNormal
| ExitShutdown
| ExitOther !String
deriving (Typeable, Generic, Eq, Show)
instance Binary ExitReason where
instance NFData ExitReason where
baseAddressableErrorMessage :: (Resolvable a) => a -> String
baseAddressableErrorMessage _ = "CannotResolveAddressable"
class Linkable a where
linkTo :: (Resolvable a) => a -> Process ()
linkTo r = resolve r >>= traverse_ link
class Killable p where
killProc :: Resolvable p => p -> String -> Process ()
killProc r s = resolve r >>= traverse_ (flip kill $ s)
exitProc :: (Resolvable p, Serializable m) => p -> m -> Process ()
exitProc r m = resolve r >>= traverse_ (flip exit $ m)
instance Resolvable p => Killable p
resolveOrDie :: (Resolvable a) => a -> String -> Process ProcessId
resolveOrDie resolvable failureMsg = do
result <- resolve resolvable
case result of
Nothing -> die $ failureMsg ++ " " ++ unresolvableMessage resolvable
Just pid -> return pid
class Resolvable a where
resolve :: a -> Process (Maybe ProcessId)
unresolvableMessage :: (Resolvable a) => a -> String
unresolvableMessage = baseAddressableErrorMessage
instance Resolvable ProcessId where
resolve p = return (Just p)
unresolvableMessage p = "CannotResolvePid[" ++ (show p) ++ "]"
instance Resolvable String where
resolve = whereis
unresolvableMessage s = "CannotResolveRegisteredName[" ++ s ++ "]"
instance Resolvable (NodeId, String) where
resolve (nid, pname) =
whereisRemote nid pname `catch` (\(_ :: SomeException) -> return Nothing)
unresolvableMessage (n, s) =
"CannotResolveRemoteRegisteredName[name: " ++ s ++ ", node: " ++ (show n) ++ "]"
class Routable a where
sendTo :: (Serializable m, Resolvable a) => a -> m -> Process ()
sendTo a m = do
mPid <- resolve a
maybe (die (unresolvableMessage a))
(\p -> P.send p m)
mPid
unsafeSendTo :: (NFSerializable m, Resolvable a) => a -> m -> Process ()
unsafeSendTo a m = do
mPid <- resolve a
maybe (die (unresolvableMessage a))
(\p -> P.unsafeSend p $!! m)
mPid
instance Routable ProcessId where
sendTo = P.send
unsafeSendTo pid msg = P.unsafeSend pid $!! msg
instance Routable String where
sendTo = nsend
unsafeSendTo name msg = P.unsafeNSend name $!! msg
instance Routable (NodeId, String) where
sendTo (nid, pname) = nsendRemote nid pname
unsafeSendTo = sendTo
instance Routable (Message -> Process ()) where
sendTo f = f . wrapMessage
unsafeSendTo f = f . unsafeWrapMessage
class (Resolvable a, Routable a) => Addressable a
instance Addressable ProcessId
data Recipient =
Pid !ProcessId
| Registered !String
| RemoteRegistered !String !NodeId
deriving (Typeable, Generic, Show, Eq)
instance Binary Recipient where
instance NFData Recipient where
rnf (Pid p) = rnf p `seq` ()
rnf (Registered s) = rnf s `seq` ()
rnf (RemoteRegistered s n) = rnf s `seq` rnf n `seq` ()
instance Resolvable Recipient where
resolve (Pid p) = return (Just p)
resolve (Registered n) = whereis n
resolve (RemoteRegistered s n) = whereisRemote n s
unresolvableMessage (Pid p) = unresolvableMessage p
unresolvableMessage (Registered n) = unresolvableMessage n
unresolvableMessage (RemoteRegistered s n) = unresolvableMessage (n, s)
instance Routable Recipient where
sendTo (Pid p) m = P.send p m
sendTo (Registered s) m = nsend s m
sendTo (RemoteRegistered s n) m = nsendRemote n s m
unsafeSendTo (Pid p) m = P.unsafeSend p $!! m
unsafeSendTo (Registered s) m = P.unsafeNSend s $!! m
unsafeSendTo (RemoteRegistered s n) m = nsendRemote n s m
newtype ServerDisconnected = ServerDisconnected DiedReason
deriving (Typeable, Generic)
instance Binary ServerDisconnected where
instance NFData ServerDisconnected where