Safe Haskell | None |
---|---|
Language | Haskell2010 |
In this module we lift all functions in Control.Distributed.Process that return a function of type Process a to Session s s a.
Since the functions in this module work identical to the ones in Control.Distributed.Process we will refer to that module for documentation.
There is however some explanation required for functions that take a Process
as an argument.
For the functions that also take a Process a as an argument we derive two functions. One that still takes a Process a and one that takes a Session s s a.
There are also functions that take a Closure (Process ()) as an argument. We cannot lift this to be Closure (Session s s ()) as is explained in Control.Distributed.Session.Closure.
To accomodate for this drawback we instead have these functions take a Closure (SessionWrap ()) as an argument.
Here is an example on how to call call
.
{-# LANGUAGE TemplateHaskell #-} import qualified SessionTypes.Indexed as I import Control.Distributed.Session (SessionWrap(..), sessionRemoteTable, call, evalSessionEq) import Control.Distributed.Process (liftIO, Process, RemoteTable, NodeId) import Control.Distributed.Process.Serializable (SerializableDict(..)) import Control.Distributed.Process.Closure (remotable, mkStaticClosure, mkStatic) import Control.Distributed.Process.Node import Network.Transport.TCP sessWrap :: SessionWrap Int sessWrap = SessionWrap $ I.return 5 sdictInt :: SerializableDict Int sdictInt = SerializableDict remotable ['sdictInt, 'sessWrap] p1 :: NodeId -> Process () p1 nid = do a <- evalSessionEq (call $(mkStatic 'sdictInt) nid $(mkStaticClosure 'sessWrap)) liftIO $ putStrLn $ show a myRemoteTable :: RemoteTable myRemoteTable = Main.__remoteTable $ sessionRemoteTable initRemoteTable main :: IO () main = do Right t <- createTransport "127.0.0.1" "100000" defaultTCPParameters node <- newLocalNode t myRemoteTable runProcess node $ p1 (localNodeId node)
>>>
main
> 5
In p1 we run a session that makes a call and then prints out the result of that call.
Note that this is the call function from SessionTyped.Distributed.Process.Lifted. It takes a Static (SerializableDict a) and a Closure (SessionWrap a).
To create a static serializable dictionary we first have to define a function that returns a monomorphic serializable dictionary.
sdictInt :: SerializableDict Int sdictInt = SerializableDict
We then pass 'sdictInt to remoteable, which is a top-level Template Haskell splice.
remoteable ['sdictInt]
Now we can create a static serializable dictionary with
$(mkStatic 'sdictInt)
To create a closure for a Session s s we have to wrap it in a SessionWrap
.
sessWrap :: SessionWrap Int sessWrap = SessionWrap $ I.return 5
Similarly to sdictInt this needs to be a top level definition such that we can use Template Haskell to derive a Closure
remotable ['sdictInt, 'sessWrap] $(mkStaticClosure 'sessWrap)
Since call
makes use of internally defined closures, you also have to include sessionRemoteTable
.
myRemoteTable = Main.__remoteTable $ sessionRemoteTable initRemoteTable
The remote tables contains a mapping from labels to evaluation functions that a node uses to evaluate closures.
node <- newLocalNode t myRemoteTable
- utsend :: Serializable a => ProcessId -> a -> Session s s ()
- usend :: Serializable a => ProcessId -> a -> Session s s ()
- expect :: Serializable a => Session s s a
- expectTimeout :: Serializable a => Int -> Session s s (Maybe a)
- newChan :: Serializable a => Session s s (SendPort a, ReceivePort a)
- sendChan :: Serializable a => SendPort a -> a -> Session s s ()
- receiveChan :: Serializable a => ReceivePort a -> Session s s a
- receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Session s s (Maybe a)
- mergePortsBiased :: Serializable a => [ReceivePort a] -> Session s s (ReceivePort a)
- mergePortsRR :: Serializable a => [ReceivePort a] -> Session s s (ReceivePort a)
- unsafeSend :: Serializable a => ProcessId -> a -> Session s s ()
- unsafeSendChan :: Serializable a => SendPort a -> a -> Session s s ()
- unsafeNSend :: Serializable a => String -> a -> Session s s ()
- unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Session s s ()
- receiveWait :: [Match b] -> Session s s b
- receiveTimeout :: Int -> [Match b] -> Session s s (Maybe b)
- unwrapMessage :: Serializable a => Message -> Session s s (Maybe a)
- handleMessage :: Serializable a => Message -> (a -> Session s s b) -> Session r r (Maybe b)
- handleMessage_ :: Serializable a => Message -> (a -> Session s s ()) -> Session r r ()
- handleMessageP :: Serializable a => Message -> (a -> Process b) -> Session s s (Maybe b)
- handleMessageP_ :: Serializable a => Message -> (a -> Process ()) -> Session s s ()
- handleMessageIf :: Serializable a => Message -> (a -> Bool) -> (a -> Session s s b) -> Session r r (Maybe b)
- handleMessageIf_ :: Serializable a => Message -> (a -> Bool) -> (a -> Session s s ()) -> Session r r ()
- handleMessageIfP :: Serializable a => Message -> (a -> Bool) -> (a -> Process b) -> Session s s (Maybe b)
- handleMessageIfP_ :: Serializable a => Message -> (a -> Bool) -> (a -> Process ()) -> Session s s ()
- forward :: Message -> ProcessId -> Session s s ()
- uforward :: Message -> ProcessId -> Session s s ()
- delegate :: ProcessId -> (Message -> Bool) -> Session s s ()
- relay :: ProcessId -> Session s s ()
- proxy :: Serializable a => ProcessId -> (a -> Session s s Bool) -> Session r r ()
- proxyP :: Serializable a => ProcessId -> (a -> Process Bool) -> Session s s ()
- spawn :: NodeId -> Closure (SessionWrap ()) -> Session s s ProcessId
- spawnP :: NodeId -> Closure (Process ()) -> Session s s ProcessId
- call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (SessionWrap a) -> Session r r a
- callP :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Session s s a
- terminate :: Session s s a
- die :: Serializable a => a -> Session s s b
- kill :: ProcessId -> String -> Session s s ()
- exit :: Serializable a => ProcessId -> a -> Session s s ()
- catchExit :: (Show a, Serializable a) => Session s s b -> (ProcessId -> a -> Session r r b) -> Session t t b
- catchExitP :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Session s s b
- catchesExit :: Session s s b -> [ProcessId -> Message -> Session r r (Maybe b)] -> Session t t b
- catchesExitP :: Process b -> [ProcessId -> Message -> Process (Maybe b)] -> Session s s b
- getSelfPid :: Session s s ProcessId
- getSelfNode :: Session s s NodeId
- getOthPid :: Session s s (Maybe ProcessId)
- getOthNode :: Session s s (Maybe NodeId)
- getProcessInfo :: ProcessId -> Session s s (Maybe ProcessInfo)
- getNodeStats :: NodeId -> Session s s (Either DiedReason NodeStats)
- link :: ProcessId -> Session s s ()
- linkNode :: NodeId -> Session s s ()
- unlink :: ProcessId -> Session s s ()
- unlinkNode :: NodeId -> Session s s ()
- monitor :: ProcessId -> Session s s MonitorRef
- monitorNode :: NodeId -> Session s s MonitorRef
- monitorPort :: Serializable a => SendPort a -> Session s s MonitorRef
- unmonitor :: MonitorRef -> Session s s ()
- withMonitor :: ProcessId -> (MonitorRef -> Session s s a) -> Session r r a
- withMonitor_ :: ProcessId -> Session s s a -> Session r r a
- withMonitorP :: ProcessId -> (MonitorRef -> Process a) -> Session s s a
- withMonitorP_ :: ProcessId -> Process a -> Session s s a
- unStatic :: Typeable a => Static a -> Session s s a
- unClosure :: Typeable a => Closure a -> Session s s a
- say :: String -> Session s s ()
- register :: String -> ProcessId -> Session s s ()
- unregister :: String -> Session s s ()
- whereis :: String -> Session s s (Maybe ProcessId)
- nsend :: Serializable a => String -> a -> Session s s ()
- registerRemoteAsync :: NodeId -> String -> ProcessId -> Session s s ()
- reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Session s s ()
- whereisRemoteAsync :: NodeId -> String -> Session s s ()
- nsendRemote :: Serializable a => NodeId -> String -> a -> Session s s ()
- spawnAsync :: NodeId -> Closure (SessionWrap ()) -> Session r r SpawnRef
- spawnAsyncP :: NodeId -> Closure (Process ()) -> Session s s SpawnRef
- spawnSupervised :: NodeId -> Closure (SessionWrap ()) -> Session s s (ProcessId, MonitorRef)
- spawnSupervisedP :: NodeId -> Closure (Process ()) -> Session s s (ProcessId, MonitorRef)
- spawnLink :: NodeId -> Closure (SessionWrap ()) -> Session s s ProcessId
- spawnLinkP :: NodeId -> Closure (Process ()) -> Session s s ProcessId
- spawnMonitor :: NodeId -> Closure (SessionWrap ()) -> Session s s (ProcessId, MonitorRef)
- spawnMonitorP :: NodeId -> Closure (Process ()) -> Session s s (ProcessId, MonitorRef)
- spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> SessionWrap ()) -> Session s s (SendPort a)
- spawnChannelP :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Session s s (SendPort a)
- spawnLocal :: Session s s () -> Session r r ProcessId
- spawnLocalP :: Process () -> Session s s ProcessId
- spawnChannelLocal :: Serializable a => (ReceivePort a -> Session s s ()) -> Session r r (SendPort a)
- spawnChannelLocalP :: Serializable a => (ReceivePort a -> Process ()) -> Session s s (SendPort a)
- callLocal :: Session s s a -> Session s s a
- callLocalP :: Process a -> Session s s a
- reconnect :: ProcessId -> Session s s ()
- reconnectPort :: SendPort a -> Session s s ()
Documentation
expect :: Serializable a => Session s s a Source #
expectTimeout :: Serializable a => Int -> Session s s (Maybe a) Source #
newChan :: Serializable a => Session s s (SendPort a, ReceivePort a) Source #
receiveChan :: Serializable a => ReceivePort a -> Session s s a Source #
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Session s s (Maybe a) Source #
mergePortsBiased :: Serializable a => [ReceivePort a] -> Session s s (ReceivePort a) Source #
mergePortsRR :: Serializable a => [ReceivePort a] -> Session s s (ReceivePort a) Source #
unsafeSend :: Serializable a => ProcessId -> a -> Session s s () Source #
unsafeSendChan :: Serializable a => SendPort a -> a -> Session s s () Source #
unsafeNSend :: Serializable a => String -> a -> Session s s () Source #
unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Session s s () Source #
receiveWait :: [Match b] -> Session s s b Source #
unwrapMessage :: Serializable a => Message -> Session s s (Maybe a) Source #
handleMessage :: Serializable a => Message -> (a -> Session s s b) -> Session r r (Maybe b) Source #
handleMessage_ :: Serializable a => Message -> (a -> Session s s ()) -> Session r r () Source #
handleMessageP :: Serializable a => Message -> (a -> Process b) -> Session s s (Maybe b) Source #
handleMessageP_ :: Serializable a => Message -> (a -> Process ()) -> Session s s () Source #
handleMessageIf :: Serializable a => Message -> (a -> Bool) -> (a -> Session s s b) -> Session r r (Maybe b) Source #
handleMessageIf_ :: Serializable a => Message -> (a -> Bool) -> (a -> Session s s ()) -> Session r r () Source #
handleMessageIfP :: Serializable a => Message -> (a -> Bool) -> (a -> Process b) -> Session s s (Maybe b) Source #
handleMessageIfP_ :: Serializable a => Message -> (a -> Bool) -> (a -> Process ()) -> Session s s () Source #
call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (SessionWrap a) -> Session r r a Source #
callP :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Session s s a Source #
die :: Serializable a => a -> Session s s b Source #
catchExit :: (Show a, Serializable a) => Session s s b -> (ProcessId -> a -> Session r r b) -> Session t t b Source #
catchExitP :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Session s s b Source #
catchesExit :: Session s s b -> [ProcessId -> Message -> Session r r (Maybe b)] -> Session t t b Source #
getSelfPid :: Session s s ProcessId Source #
getSelfNode :: Session s s NodeId Source #
getProcessInfo :: ProcessId -> Session s s (Maybe ProcessInfo) Source #
getNodeStats :: NodeId -> Session s s (Either DiedReason NodeStats) Source #
unlinkNode :: NodeId -> Session s s () Source #
monitorNode :: NodeId -> Session s s MonitorRef Source #
monitorPort :: Serializable a => SendPort a -> Session s s MonitorRef Source #
unmonitor :: MonitorRef -> Session s s () Source #
withMonitor :: ProcessId -> (MonitorRef -> Session s s a) -> Session r r a Source #
withMonitorP :: ProcessId -> (MonitorRef -> Process a) -> Session s s a Source #
unregister :: String -> Session s s () Source #
nsendRemote :: Serializable a => NodeId -> String -> a -> Session s s () Source #
spawnAsync :: NodeId -> Closure (SessionWrap ()) -> Session r r SpawnRef Source #
spawnSupervised :: NodeId -> Closure (SessionWrap ()) -> Session s s (ProcessId, MonitorRef) Source #
spawnSupervisedP :: NodeId -> Closure (Process ()) -> Session s s (ProcessId, MonitorRef) Source #
spawnMonitor :: NodeId -> Closure (SessionWrap ()) -> Session s s (ProcessId, MonitorRef) Source #
spawnMonitorP :: NodeId -> Closure (Process ()) -> Session s s (ProcessId, MonitorRef) Source #
spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> SessionWrap ()) -> Session s s (SendPort a) Source #
spawnChannelP :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Session s s (SendPort a) Source #
spawnChannelLocal :: Serializable a => (ReceivePort a -> Session s s ()) -> Session r r (SendPort a) Source #
spawnChannelLocalP :: Serializable a => (ReceivePort a -> Process ()) -> Session s s (SendPort a) Source #
callLocalP :: Process a -> Session s s a Source #
reconnectPort :: SendPort a -> Session s s () Source #