module Control.Distributed.Process.Tests.Mx (tests) where
import Control.Distributed.Process.Tests.Internal.Utils
import Network.Transport.Test (TestTransport(..))
import Control.Concurrent (threadDelay)
import Control.Distributed.Process
import Control.Distributed.Process.Node
import Control.Distributed.Process.Management
( MxEvent(..)
, MxAgentId(..)
, mxAgent
, mxSink
, mxReady
, mxReceive
, mxDeactivate
, liftMX
, mxGetLocal
, mxSetLocal
, mxUpdateLocal
, mxNotify
, mxBroadcast
, mxGetId
, mxGet
, mxSet
, mxClear
, mxPurgeTable
, mxDropTable
)
import Control.Monad (void)
import Data.Binary
import Data.List (find, sort)
import Data.Maybe (isJust)
import Data.Typeable
import GHC.Generics
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch, log)
#endif
import Test.Framework
( Test
, testGroup
)
import Test.Framework.Providers.HUnit (testCase)
data Publish = Publish
deriving (Typeable, Generic, Eq)
instance Binary Publish where
testAgentBroadcast :: TestResult () -> Process ()
testAgentBroadcast result = do
(resultSP, resultRP) <- newChan :: Process (SendPort (), ReceivePort ())
publisher <- mxAgent (MxAgentId "publisher-agent") () [
mxSink $ \() -> mxBroadcast Publish >> mxReady
]
consumer <- mxAgent (MxAgentId "consumer-agent") () [
mxSink $ \Publish -> (liftMX $ sendChan resultSP ()) >> mxReady
]
mxNotify ()
stash result =<< receiveChan resultRP
kill publisher "finished"
kill consumer "finished"
testAgentDualInput :: TestResult (Maybe Int) -> Process ()
testAgentDualInput result = do
(sp, rp) <- newChan
_ <- mxAgent (MxAgentId "sum-agent") (0 :: Int) [
mxSink $ (\(i :: Int) -> do
mxSetLocal . (+i) =<< mxGetLocal
i' <- mxGetLocal
if i' == 15
then do mxGetLocal >>= liftMX . sendChan sp
mxDeactivate "finished"
else mxReady)
]
mxNotify (1 :: Int)
nsend "sum-agent" (3 :: Int)
mxNotify (2 :: Int)
nsend "sum-agent" (4 :: Int)
mxNotify (5 :: Int)
stash result =<< receiveChanTimeout 10000000 rp
testAgentPrioritisation :: TestResult [String] -> Process ()
testAgentPrioritisation result = do
let name = "prioritising-agent"
(sp, rp) <- newChan
void $ mxAgent (MxAgentId name) ["first"] [
mxSink (\(s :: String) -> do
mxUpdateLocal ((s:))
st <- mxGetLocal
case length st of
n | n == 5 -> do liftMX $ sendChan sp st
mxDeactivate "finished"
_ -> mxReceive
)
]
nsend name "second"
mxNotify "third"
mxNotify "fourth"
nsend name "fifth"
stash result . sort =<< receiveChan rp
testAgentMailboxHandling :: TestResult (Maybe ()) -> Process ()
testAgentMailboxHandling result = do
(sp, rp) <- newChan
agent <- mxAgent (MxAgentId "listener-agent") () [
mxSink $ \() -> (liftMX $ sendChan sp ()) >> mxReady
]
nsend "listener-agent" ()
stash result =<< receiveChanTimeout 1000000 rp
kill agent "finished"
testAgentEventHandling :: TestResult Bool -> Process ()
testAgentEventHandling result = do
let initState = [] :: [MxEvent]
agentPid <- mxAgent (MxAgentId "lifecycle-listener-agent") initState [
(mxSink $ \ev -> do
st <- mxGetLocal
let act =
case ev of
(MxSpawned _) -> mxSetLocal (ev:st)
(MxProcessDied _ _) -> mxSetLocal (ev:st)
_ -> return ()
act >> mxReady),
(mxSink $ \(ev, sp :: SendPort Bool) -> do
st <- mxGetLocal
let found =
case ev of
MxSpawned p ->
isJust $ find (\ev' ->
case ev' of
(MxSpawned p') -> p' == p
_ -> False) st
MxProcessDied p r ->
isJust $ find (\ev' ->
case ev' of
(MxProcessDied p' r') -> p' == p && r == r'
_ -> False) st
_ -> False
liftMX $ sendChan sp found
mxReady)
]
_ <- monitor agentPid
(sp, rp) <- newChan
pid <- spawnLocal $ sendChan sp ()
() <- receiveChan rp
monitor pid
receiveWait [ match (\(ProcessMonitorNotification _ _ _) -> return ()) ]
(replyTo, reply) <- newChan :: Process (SendPort Bool, ReceivePort Bool)
mxNotify (MxSpawned pid, replyTo)
mxNotify (MxProcessDied pid DiedNormal, replyTo)
seenAlive <- receiveChan reply
seenDead <- receiveChan reply
stash result $ seenAlive && seenDead
testAgentPublication :: TestResult (Maybe Publish) -> Process ()
testAgentPublication result = do
(syncChan, rp) <- newChan
let ourId = MxAgentId "publication-agent"
agentPid <- mxAgent ourId () [
mxSink $ \() -> do
selfId <- mxGetId
liftMX $ mxSet selfId "publish" Publish >> sendChan syncChan ()
mxReady
]
mxNotify ()
() <- receiveChan rp
stash result =<< mxGet ourId "publish"
mref <- monitor agentPid
kill agentPid "finished"
receiveWait [
matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mref)
((\_ -> return ()))
]
testAgentTableClear :: Int -> TestResult (Maybe Int, Maybe Int) -> Process ()
testAgentTableClear val result =
let tId = (MxAgentId "agent-1")
tKey = "key-1" in do
mxSet tId tKey val
get1 <- mxGet tId tKey
mxClear tId tKey
get2 <- mxGet tId tKey
stash result (get1, get2)
testAgentTablePurge :: TestResult (Maybe Int) -> Process ()
testAgentTablePurge result =
let tId = MxAgentId "agent-2"
tKey = "key-2" in do
mxSet tId tKey (12345 :: Int)
mxPurgeTable tId
stash result =<< mxGet tId tKey
testAgentTableDelete :: Int
-> TestResult (Maybe Int, Maybe Int, Maybe Int)
-> Process ()
testAgentTableDelete val result =
let tId = (MxAgentId "agent-3")
tKey = "key-3" in do
mxSet tId tKey val
get1 <- mxGet tId tKey
mxDropTable tId
get2 <- mxGet tId tKey
mxSet tId tKey val
get3 <- mxGet tId tKey
stash result (get1, get2, get3)
tests :: TestTransport -> IO [Test]
tests TestTransport{..} = do
node1 <- newLocalNode testTransport initRemoteTable
return [
testGroup "Mx Agents" [
testCase "Event Handling"
(delayedAssertion
"expected True, but events where not as expected"
node1 True testAgentEventHandling)
, testCase "Inter-Agent Broadcast"
(delayedAssertion
"expected (), but no broadcast was received"
node1 () testAgentBroadcast)
, testCase "Agent Mailbox Handling"
(delayedAssertion
"expected (Just ()), but no regular (mailbox) input was handled"
node1 (Just ()) testAgentMailboxHandling)
, testCase "Agent Dual Input Handling"
(delayedAssertion
"expected sum = 15, but the result was Nothing"
node1 (Just 15 :: Maybe Int) testAgentDualInput)
, testCase "Agent Input Prioritisation"
(delayedAssertion
"expected [first, second, third, fourth, fifth], but result diverged"
node1 (sort ["first", "second",
"third", "fourth",
"fifth"]) testAgentPrioritisation)
],
testGroup "Mx Global Properties" [
testCase "Global Property Publication"
(delayedAssertion
"expected (Just Publish), but no table entry was found"
node1 (Just Publish) testAgentPublication)
, testCase "Clearing Global Properties"
(delayedAssertion
"expected (Just 1024, Nothing): invalid table entry found!"
node1 (Just 1024, Nothing) (testAgentTableClear 1024))
, testCase "Purging Global Tables"
(delayedAssertion
"expected Nothing, but a table entry was found"
node1 Nothing testAgentTablePurge)
, testCase "Deleting and (Re)Creating Global Tables"
(delayedAssertion
"expected (Just 15, Nothing, Just 15): invalid table entry found!"
node1 (Just 15, Nothing, Just 15) (testAgentTableDelete 15))
, testCase "Wait" $
threadDelay 100000
]]