{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE LiberalTypeSynonyms #-}
module Control.Distributed.Process.ManagedProcess.UnsafeClient
(
sendControlMessage
, shutdown
, call
, safeCall
, tryCall
, callTimeout
, flushPendingCalls
, callAsync
, cast
, callChan
, syncCallChan
, syncSafeCallChan
) where
import Control.Distributed.Process
( Process
, ProcessId
, ReceivePort
, newChan
, matchChan
, match
, die
, terminate
, receiveTimeout
, unsafeSendChan
, getSelfPid
, catchesExit
, handleMessageIf
)
import Control.Distributed.Process.Async
( Async
, async
, task
)
import Control.Distributed.Process.Extras
( awaitResponse
, Addressable
, Routable(..)
, NFSerializable
, ExitReason(..)
, Shutdown(..)
)
import Control.Distributed.Process.ManagedProcess.Internal.Types
( Message(CastMessage, ChanMessage)
, CallResponse(..)
, ControlPort(..)
, unsafeInitCall
, waitResponse
)
import Control.Distributed.Process.Extras.Time
( TimeInterval
, asTimeout
)
import Control.Distributed.Process.Serializable hiding (SerializableDict)
import Data.Maybe (fromJust)
sendControlMessage :: Serializable m => ControlPort m -> m -> Process ()
sendControlMessage cp m = unsafeSendChan (unPort cp) (CastMessage m)
shutdown :: ProcessId -> Process ()
shutdown pid = cast pid Shutdown
call :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> Process b
call sid msg = unsafeInitCall sid msg >>= waitResponse Nothing >>= decodeResult
where decodeResult (Just (Right r)) = return r
decodeResult (Just (Left err)) = die err
decodeResult Nothing = terminate
safeCall :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> Process (Either ExitReason b)
safeCall s m = do
us <- getSelfPid
(fmap fromJust (unsafeInitCall s m >>= waitResponse Nothing) :: Process (Either ExitReason b))
`catchesExit` [\pid msg -> handleMessageIf msg (weFailed pid us)
(return . Left)]
where
weFailed a b (ExitOther _) = a == b
weFailed _ _ _ = False
tryCall :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> Process (Maybe b)
tryCall s m = unsafeInitCall s m >>= waitResponse Nothing >>= decodeResult
where decodeResult (Just (Right r)) = return $ Just r
decodeResult _ = return Nothing
callTimeout :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> TimeInterval -> Process (Maybe b)
callTimeout s m d = unsafeInitCall s m >>= waitResponse (Just d) >>= decodeResult
where decodeResult :: (NFSerializable b)
=> Maybe (Either ExitReason b)
-> Process (Maybe b)
decodeResult Nothing = return Nothing
decodeResult (Just (Right result)) = return $ Just result
decodeResult (Just (Left reason)) = die reason
flushPendingCalls :: forall b . (NFSerializable b)
=> TimeInterval
-> (b -> Process b)
-> Process (Maybe b)
flushPendingCalls d proc =
receiveTimeout (asTimeout d) [
match (\(CallResponse (m :: b) _) -> proc m)
]
callAsync :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> Process (Async b)
callAsync server msg = async $ task $ call server msg
cast :: forall a m . (Addressable a, NFSerializable m)
=> a -> m -> Process ()
cast server msg = unsafeSendTo server ((CastMessage msg) :: Message m ())
callChan :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> Process (ReceivePort b)
callChan server msg = do
(sp, rp) <- newChan
unsafeSendTo server ((ChanMessage msg sp) :: Message a b)
return rp
syncCallChan :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> Process b
syncCallChan server msg = do
r <- syncSafeCallChan server msg
case r of
Left e -> die e
Right r' -> return r'
syncSafeCallChan :: forall s a b . (Addressable s, NFSerializable a, NFSerializable b)
=> s -> a -> Process (Either ExitReason b)
syncSafeCallChan server msg = do
rp <- callChan server msg
awaitResponse server [ matchChan rp (return . Right) ]