module Control.Distributed.Session.Closure (
SpawnSession (..),
SessionWrap (..),
sessionRemoteTable,
remoteSessionStatic,
remoteSessionClosure,
remoteSessionStatic',
remoteSessionClosure',
spawnChannelStatic,
spawnChannelClosure,
evalLocalSession,
remoteSpawnSessionStatic,
remoteSpawnSessionClosure,
remoteSpawnSessionStatic',
remoteSpawnSessionClosure',
rrSpawnSessionSendStatic,
rrSpawnSessionSendClosure,
rrSpawnSessionExpectStatic,
rrSpawnSessionExpectClosure
) where
import Control.SessionTypes.Types
import Control.Distributed.Session.Eval
import Control.Distributed.Session.STChannel as ST
import Control.Distributed.Session.Session
import Control.Distributed.Process hiding (spawnChannel)
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Static
import Data.ByteString.Lazy (ByteString)
import Data.Binary (encode, decode)
import Data.Rank1Dynamic (toDynamic)
import Data.Rank1Typeable (ANY, Typeable)
data SpawnSession a b where
SpawnSession :: (HasConstraintST Serializable s, HasConstraintST Serializable (DualST s), Typeable a, Typeable b) =>
Session ('Cap '[] s) r a -> Session ('Cap '[] (DualST s)) r b -> SpawnSession a b
data SessionWrap a where
SessionWrap :: Session s s a -> SessionWrap a
remoteSessionStatic :: Static (SerializableDict a -> Closure (SessionWrap a) -> Process a)
remoteSessionStatic = staticLabel "$remoteSession"
evalRemoteSession :: SerializableDict a -> Closure (SessionWrap a) -> Process a
evalRemoteSession SerializableDict proc = do
(SessionWrap sess) <- unClosure proc
evalSessionEq sess
remoteSessionClosure :: Static (SerializableDict a) -> Closure (SessionWrap a) -> Closure (Process a)
remoteSessionClosure sdict proc = closure decoder (encode proc)
where decoder = (remoteSessionStatic `staticApply` sdict) `staticCompose` decodeSessionWrap
remoteSessionStatic' :: Static (Closure (SessionWrap ()) -> Process ())
remoteSessionStatic' = staticLabel "$remoteSession'"
evalRemoteSession' :: Closure (SessionWrap ()) -> Process ()
evalRemoteSession' proc = do
(SessionWrap sess) <- unClosure proc
evalSessionEq sess
remoteSessionClosure' :: Closure (SessionWrap ()) -> Closure (Process ())
remoteSessionClosure' tpl = closure decoder (encode tpl)
where decoder = remoteSessionStatic' `staticCompose` decodeSessionWrap
spawnChannelStatic :: Static (SerializableDict a -> Closure (ReceivePort a -> SessionWrap ()) -> ReceivePort a -> Process ())
spawnChannelStatic = staticLabel "$spawnChannel"
spawnChannel :: SerializableDict a -> Closure (ReceivePort a -> SessionWrap ()) -> ReceivePort a -> Process ()
spawnChannel SerializableDict proc rp = do
(SessionWrap sess) <- unClosure proc >>= \f -> return (f rp)
evalSessionEq sess
spawnChannelClosure :: Static (SerializableDict a) -> Closure (ReceivePort a -> SessionWrap ()) -> Closure (ReceivePort a -> Process ())
spawnChannelClosure sdict proc = closure decoder (encode proc)
where decoder = (spawnChannelStatic `staticApply` sdict) `staticCompose` decodeSpawnChannel
evalLocalSession :: Typeable a => (ProcessId, NodeId, Closure (SpawnSession a ())) -> Process a
evalLocalSession (pidOth, nodeOth, proc) = do
(spSelf, rpSelf) <- ST.newUTChan
send pidOth spSelf
spOth <- expect :: Process (SendPort ST.Message)
(SpawnSession s r) <- unClosure proc
evalSession s (SessionInfo pidOth nodeOth (spOth, rpSelf))
remoteSpawnSessionStatic :: Static (SerializableDict a -> (ProcessId, NodeId, Closure (SpawnSession a ())) -> Process ())
remoteSpawnSessionStatic = staticLabel "$remoteSpawnSession"
evalRemoteSpawnSession :: SerializableDict a -> (ProcessId, NodeId, Closure (SpawnSession a ())) -> Process ()
evalRemoteSpawnSession SerializableDict (pidOth, nodeOth, proc) = do
(spSelf, rpSelf) <- ST.newUTChan
send pidOth spSelf
spOth <- expect :: Process (SendPort ST.Message)
(SpawnSession s r) <- unClosure proc
evalSession r (SessionInfo pidOth nodeOth (spOth, rpSelf))
remoteSpawnSessionClosure :: Static (SerializableDict a) -> (ProcessId, NodeId, Closure (SpawnSession a ())) -> Closure (Process ())
remoteSpawnSessionClosure tdictS tpl = closure decoder (encode tpl)
where decoder :: Static (ByteString -> Process ())
decoder = (remoteSpawnSessionStatic `staticApply` tdictS) `staticCompose` decodeSpawnSession
remoteSpawnSessionStatic' :: Static ((ProcessId, NodeId, Closure (SpawnSession () ())) -> Process ())
remoteSpawnSessionStatic' = staticLabel "$remoteSpawnSession'"
evalRemoteSpawnSession' :: (ProcessId, NodeId, Closure (SpawnSession () ())) -> Process ()
evalRemoteSpawnSession' (pidOth, nodeOth, proc) = do
(spSelf, rpSelf) <- ST.newUTChan
send pidOth spSelf
spOth <- expect :: Process (SendPort ST.Message)
(SpawnSession _ r) <- unClosure proc
evalSession r (SessionInfo pidOth nodeOth (spOth, rpSelf))
remoteSpawnSessionClosure' :: (ProcessId, NodeId, Closure (SpawnSession () ())) -> Closure (Process ())
remoteSpawnSessionClosure' tpl = closure decoder (encode tpl)
where decoder :: Static (ByteString -> Process ())
decoder = remoteSpawnSessionStatic' `staticCompose` decodeSpawnSession
rrSpawnSessionSendStatic :: Static ((ProcessId, NodeId, Closure (SpawnSession () ())) -> Process ())
rrSpawnSessionSendStatic = staticLabel "$rrSpawnSessionSend"
rrSpawnSessionSend :: (ProcessId, NodeId, Closure (SpawnSession () ())) -> Process ()
rrSpawnSessionSend (pid, node, proc) = do
pidSelf <- getSelfPid
(spSelf, rpSelf) <- ST.newUTChan
send pid pidSelf
send pid spSelf
spOth <- expect :: Process (SendPort ST.Message)
(SpawnSession _ sess) <- unClosure proc
evalSession sess (SessionInfo pid node (spOth, rpSelf))
rrSpawnSessionSendClosure :: (ProcessId, NodeId, Closure (SpawnSession () ())) -> Closure (Process ())
rrSpawnSessionSendClosure tpl = closure decoder (encode tpl)
where decoder :: Static (ByteString -> Process ())
decoder = rrSpawnSessionSendStatic `staticCompose` decodeSpawnSession
rrSpawnSessionExpectStatic :: Static ((NodeId, Closure (SpawnSession () ())) -> Process ())
rrSpawnSessionExpectStatic = staticLabel "$rrSpawnSessionExpect"
rrSpawnSessionExpect :: (NodeId, Closure (SpawnSession () ())) -> Process ()
rrSpawnSessionExpect (node, proc) = do
(spSelf, rpSelf) <- ST.newUTChan
pidOth <- expect
send pidOth spSelf
spOth <- expect :: Process (SendPort ST.Message)
SpawnSession sess _ <- unClosure proc
evalSession sess (SessionInfo pidOth node (spOth, rpSelf))
rrSpawnSessionExpectClosure :: (NodeId, Closure (SpawnSession () ())) -> Closure (Process ())
rrSpawnSessionExpectClosure tpl = closure decoder (encode tpl)
where decoder :: Static (ByteString -> Process ())
decoder = rrSpawnSessionExpectStatic `staticCompose` decodeSpawnSessionNoPid
decodeSpawnSession :: Static (ByteString -> (ProcessId, NodeId, Closure (SpawnSession a ())))
decodeSpawnSession = staticLabel "$decodeSpawnSession"
decodeSpawnSessionNoPid :: Static (ByteString -> (NodeId, Closure (SpawnSession a ())))
decodeSpawnSessionNoPid = staticLabel "$decodeSpawnSessionNoPid"
decodeSessionWrap :: Static (ByteString -> Closure (SessionWrap a))
decodeSessionWrap = staticLabel "$decodeSessionWrap"
decodeSpawnChannel :: Static (ByteString -> Closure (ReceivePort a -> SessionWrap ()))
decodeSpawnChannel = staticLabel "$decodeSpawnChannel"
sessionRemoteTable :: RemoteTable -> RemoteTable
sessionRemoteTable rtable =
registerStatic "$remoteSession" (toDynamic (evalRemoteSession :: SerializableDict ANY -> Closure (SessionWrap ANY) -> Process ANY)) $
registerStatic "$remoteSession'" (toDynamic evalRemoteSession') $
registerStatic "$spawnChannel" (toDynamic (spawnChannel :: SerializableDict ANY -> Closure (ReceivePort ANY -> SessionWrap ()) -> ReceivePort ANY -> Process ())) $
registerStatic "$remoteSpawnSession" (toDynamic (evalRemoteSpawnSession :: SerializableDict ANY -> (ProcessId, NodeId, Closure (SpawnSession ANY ())) -> Process ())) $
registerStatic "$remoteSpawnSession'" (toDynamic evalRemoteSpawnSession') $
registerStatic "$rrSpawnSessionSend" (toDynamic rrSpawnSessionSend) $
registerStatic "$rrSpawnSessionExpect" (toDynamic rrSpawnSessionExpect) $
registerStatic "$decodeSpawnSession" (toDynamic (decode :: ByteString -> (ProcessId, NodeId, Closure (SpawnSession ANY ())))) $
registerStatic "$decodeSpawnSessionNoPid" (toDynamic (decode :: ByteString -> (NodeId, Closure (SpawnSession ANY ())))) $
registerStatic "$decodeSessionWrap" (toDynamic (decode :: ByteString -> Closure (SessionWrap ANY))) $
registerStatic "$decodeSpawnChannel" (toDynamic (decode :: ByteString -> Closure (ReceivePort ANY -> SessionWrap ()))) $
rtable