module Control.Distributed.Process.Management.Internal.Agent where
import Control.Applicative ((<$>))
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan
( TChan
, newBroadcastTChanIO
, readTChan
, writeTChan
, dupTChan
)
import Control.Distributed.Process.Internal.Primitives
( receiveWait
, matchAny
, die
, catches
, Handler(..)
)
import Control.Distributed.Process.Internal.CQueue
( enqueueSTM
, CQueue
)
import Control.Distributed.Process.Management.Internal.Types
( Fork
)
import Control.Distributed.Process.Management.Internal.Trace.Tracer
( traceController
)
import Control.Distributed.Process.Internal.Types
( Process
, Message
, Tracer(..)
, LocalProcess(..)
, ProcessId
, forever'
)
import Control.Exception (AsyncException(ThreadKilled), SomeException)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import GHC.Weak (Weak, deRefWeak)
type AgentConfig =
(Tracer, Weak (CQueue Message),
(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId))
mxAgentController :: Fork
-> MVar AgentConfig
-> Process ()
mxAgentController forkProcess mv = do
trc <- liftIO $ startTracing forkProcess
sigbus <- liftIO $ newBroadcastTChanIO
liftIO $ startDeadLetterQueue sigbus
weakQueue <- processWeakQ <$> ask
liftIO $ putMVar mv (trc, weakQueue, mxStartAgent forkProcess sigbus)
go sigbus trc
where
go bus tracer = forever' $ do
void $ receiveWait [
matchAny (liftIO . broadcast bus tracer)
] `catches` [Handler (\ThreadKilled -> die "Killed"),
Handler (\(_ :: SomeException) -> return ())]
broadcast :: TChan Message -> Tracer -> Message -> IO ()
broadcast ch tr msg = do
tmQueue <- tracerQueue tr
atomicBroadcast ch tmQueue msg
tracerQueue :: Tracer -> IO (Maybe (CQueue Message))
tracerQueue (Tracer _ wQ) = deRefWeak wQ
atomicBroadcast :: TChan Message
-> Maybe (CQueue Message)
-> Message -> IO ()
atomicBroadcast ch Nothing msg = liftIO $ atomically $ writeTChan ch msg
atomicBroadcast ch (Just q) msg = do
liftIO $ atomically $ enqueueSTM q msg >> writeTChan ch msg
mxStartAgent :: Fork
-> TChan Message
-> ((TChan Message, TChan Message) -> Process ())
-> IO ProcessId
mxStartAgent fork chan handler = do
chan' <- atomically (dupTChan chan)
let proc = handler (chan, chan')
fork proc
startTracing :: Fork -> IO Tracer
startTracing forkProcess = do
mv <- newEmptyMVar
pid <- forkProcess $ traceController mv
wQ <- liftIO $ takeMVar mv
return $ Tracer pid wQ
startDeadLetterQueue :: TChan Message
-> IO ()
startDeadLetterQueue sigbus = do
chan' <- atomically (dupTChan sigbus)
void $ forkIO $ forever' $ do
void $ atomically $ readTChan chan'