module Control.Distributed.Process.Async
(
AsyncRef
, AsyncTask(..)
, Async
, AsyncResult(..)
, async
, asyncLinked
, task
, remoteTask
, monitorAsync
, asyncWorker
, cancel
, cancelWait
, cancelWith
, cancelKill
, poll
, check
, wait
, waitAny
, waitAnyTimeout
, waitTimeout
, waitCancelTimeout
, waitCheckTimeout
, pollSTM
, waitSTM
, waitAnySTM
, waitAnyCancel
, waitEither
, waitEither_
, waitBoth
) where
import Control.Applicative
import Control.Concurrent.STM hiding (check)
import Control.Distributed.Process hiding (catch, finally)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Async.Internal.Types
import Control.Monad
import Control.Monad.Catch (finally)
import Data.Maybe
( fromMaybe
)
import System.Timeout (timeout)
import Prelude
task :: Process a -> AsyncTask a
task = AsyncTask
remoteTask :: Static (SerializableDict a)
-> NodeId
-> Closure (Process a)
-> AsyncTask a
remoteTask = AsyncRemoteTask
monitorAsync :: Async a -> Process MonitorRef
monitorAsync = monitor . _asyncWorker
async :: (Serializable a) => AsyncTask a -> Process (Async a)
async = asyncDo False
asyncWorker :: Async a -> ProcessId
asyncWorker = _asyncWorker
asyncLinked :: (Serializable a) => AsyncTask a -> Process (Async a)
asyncLinked = asyncDo True
asyncDo :: (Serializable a) => Bool -> AsyncTask a -> Process (Async a)
asyncDo shouldLink (AsyncRemoteTask d n c) =
asyncDo shouldLink $ AsyncTask $ call d n c
asyncDo shouldLink (AsyncTask proc) = do
root <- getSelfPid
result <- liftIO newEmptyTMVarIO
sigStart <- liftIO newEmptyTMVarIO
(sp, rp) <- newChan
insulator <- spawnLocal $ do
worker <- spawnLocal $ do
liftIO $ atomically $ takeTMVar sigStart
r <- proc
void $ liftIO $ atomically $ putTMVar result (AsyncDone r)
sendChan sp worker
wref <- monitor worker
rref <- if shouldLink then fmap Just (monitor root) else return Nothing
finally (pollUntilExit worker result)
(unmonitor wref >>
return (maybe (return ()) unmonitor rref))
workerPid <- receiveChan rp
liftIO $ atomically $ putTMVar sigStart ()
return Async { _asyncWorker = workerPid
, _asyncMonitor = insulator
, _asyncWait = readTMVar result
}
where
pollUntilExit :: (Serializable a)
=> ProcessId
-> TMVar (AsyncResult a)
-> Process ()
pollUntilExit wpid result' = do
r <- receiveWait [
match (\c@CancelWait -> kill wpid "cancel" >> return (Left c))
, match (\(ProcessMonitorNotification _ pid' r) ->
return (Right (pid', r)))
]
case r of
Left CancelWait
-> liftIO $ atomically $ putTMVar result' AsyncCancelled
Right (fpid, d)
| fpid == wpid -> case d of
DiedNormal -> return ()
_ -> liftIO $ atomically $ void $
tryPutTMVar result' (AsyncFailed d)
| otherwise -> do
kill wpid "linkFailed"
receiveWait
[ matchIf (\(ProcessMonitorNotification _ pid' _) ->
pid' == wpid
) $ \_ -> return ()
]
liftIO $ atomically $ void $
tryPutTMVar result' (AsyncLinkFailed d)
poll :: (Serializable a) => Async a -> Process (AsyncResult a)
poll hAsync = do
r <- liftIO $ atomically $ pollSTM hAsync
return $ fromMaybe AsyncPending r
check :: (Serializable a) => Async a -> Process (Maybe (AsyncResult a))
check hAsync = poll hAsync >>= \r -> case r of
AsyncPending -> return Nothing
ar -> return (Just ar)
waitCheckTimeout :: (Serializable a) =>
Int -> Async a -> Process (AsyncResult a)
waitCheckTimeout t hAsync =
fmap (fromMaybe AsyncPending) (waitTimeout t hAsync)
{-# INLINE wait #-}
wait :: Async a -> Process (AsyncResult a)
wait = liftIO . atomically . waitSTM
waitTimeout :: (Serializable a) =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout t hAsync =
liftIO $ timeout t $ atomically $ waitSTM hAsync
waitCancelTimeout :: (Serializable a)
=> Int
-> Async a
-> Process (AsyncResult a)
waitCancelTimeout t hAsync = do
r <- waitTimeout t hAsync
case r of
Nothing -> cancelWait hAsync
Just ar -> return ar
waitAny :: (Serializable a)
=> [Async a]
-> Process (Async a, AsyncResult a)
waitAny asyncs = liftIO $ waitAnySTM asyncs
waitAnyCancel :: (Serializable a)
=> [Async a] -> Process (Async a, AsyncResult a)
waitAnyCancel asyncs =
waitAny asyncs `finally` mapM_ cancel asyncs
waitEither :: Async a
-> Async b
-> Process (Either (AsyncResult a) (AsyncResult b))
waitEither left right =
liftIO $ atomically $
(Left <$> waitSTM left)
`orElse`
(Right <$> waitSTM right)
waitEither_ :: Async a -> Async b -> Process ()
waitEither_ left right =
liftIO $ atomically $
(void $ waitSTM left)
`orElse`
(void $ waitSTM right)
waitBoth :: Async a
-> Async b
-> Process (AsyncResult a, AsyncResult b)
waitBoth left right =
liftIO $ atomically $ do
a <- waitSTM left
`orElse`
(waitSTM right >> retry)
b <- waitSTM right
return (a,b)
waitAnyTimeout :: (Serializable a)
=> Int
-> [Async a]
-> Process (Maybe (AsyncResult a))
waitAnyTimeout delay asyncs =
liftIO $ timeout delay $ do
r <- waitAnySTM asyncs
return $ snd r
cancel :: Async a -> Process ()
cancel (Async _ g _) = send g CancelWait
cancelWait :: (Serializable a) => Async a -> Process (AsyncResult a)
cancelWait hAsync = cancel hAsync >> wait hAsync
cancelWith :: (Serializable b) => b -> Async a -> Process ()
cancelWith reason hAsync = exit (_asyncWorker hAsync) reason
cancelKill :: String -> Async a -> Process ()
cancelKill reason hAsync = kill (_asyncWorker hAsync) reason
waitAnySTM :: [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM asyncs =
atomically $
foldr orElse retry $
map (\a -> do r <- waitSTM a; return (a, r)) asyncs
waitSTM :: Async a -> STM (AsyncResult a)
waitSTM (Async _ _ w) = w
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (AsyncResult a))
pollSTM (Async _ _ w) = (Just <$> w) `orElse` return Nothing