{-# LANGUAGE PackageImports, CPP, GeneralizedNewtypeDeriving #-}
module Control.Monad.Par.Scheds.DirectInternal where
import Control.Applicative
import "mtl" Control.Monad.Cont as C
import qualified "mtl" Control.Monad.Reader as RD
import Control.Monad.IO.Class (liftIO)
import qualified System.Random.MWC as Random
import Control.Concurrent hiding (yield)
import GHC.Conc
import Data.IORef
import qualified Data.Set as S
import Data.Word (Word64)
import Data.Concurrent.Deque.Class (WSDeque)
import Control.Monad.Fix (MonadFix (mfix))
#if MIN_VERSION_base(4,4,0)
import GHC.IO.Unsafe (unsafeDupableInterleaveIO)
#else
import GHC.IO.Unsafe (unsafeInterleaveIO)
#endif
#ifdef USE_CHASELEV
#warning "Note: using Chase-Lev lockfree workstealing deques..."
import Data.Concurrent.Deque.ChaseLev.DequeInstance
import Data.Concurrent.Deque.ChaseLev as R
#endif
import Control.Exception (Exception, throwIO, BlockedIndefinitelyOnMVar (..),
catch)
newtype Par a = Par { unPar :: C.ContT () ROnly a }
deriving (Functor, Applicative, Monad, MonadCont, RD.MonadReader Sched)
type ROnly = RD.ReaderT Sched IO
instance MonadFix Par where
mfix = fixPar
fixPar :: (a -> Par a) -> Par a
fixPar f = Par $ ContT $ \ar -> RD.ReaderT $ \sched -> do
mv <- newEmptyMVar
ans <- unsafeDupableInterleaveIO (readMVar mv `catch`
\ ~BlockedIndefinitelyOnMVar -> throwIO FixParException)
flip RD.runReaderT sched $
runContT (unPar (f ans)) $ \a -> liftIO (putMVar mv a) >> ar a
#if !MIN_VERSION_base(4,4,0)
unsafeDupableInterleaveIO :: IO a -> IO a
unsafeDupableInterleaveIO = unsafeInterleaveIO
#endif
data FixParException = FixParException deriving Show
instance Exception FixParException
type SessionID = Word64
data Session = Session SessionID (HotVar Bool)
data Sched = Sched
{
no :: {-# UNPACK #-} !Int,
workpool :: WSDeque (Par ()),
rng :: HotVar Random.GenIO,
isMain :: Bool,
sessions :: HotVar [Session],
idle :: HotVar [MVar Bool],
scheds :: [Sched],
activeSessions :: HotVar (S.Set SessionID),
sessionCounter :: HotVar SessionID
}
#ifndef HOTVAR
#define HOTVAR 1
#endif
newHotVar :: a -> IO (HotVar a)
modifyHotVar :: HotVar a -> (a -> (a,b)) -> IO b
modifyHotVar_ :: HotVar a -> (a -> a) -> IO ()
writeHotVar :: HotVar a -> a -> IO ()
readHotVar :: HotVar a -> IO a
{-# INLINE newHotVar #-}
{-# INLINE modifyHotVar #-}
{-# INLINE modifyHotVar_ #-}
{-# INLINE readHotVar #-}
{-# INLINE writeHotVar #-}
#if HOTVAR == 1
type HotVar a = IORef a
newHotVar = newIORef
modifyHotVar = atomicModifyIORef
modifyHotVar_ v fn = atomicModifyIORef v (\a -> (fn a, ()))
readHotVar = readIORef
writeHotVar = writeIORef
instance Show (IORef a) where
show _ref = "<ioref>"
writeHotVarRaw :: HotVar a -> a -> IO ()
hotVarTransaction = error "Transactions not currently possible for IO refs"
readHotVarRaw :: HotVar a -> IO a
readHotVarRaw = readHotVar
writeHotVarRaw = writeHotVar
#elif HOTVAR == 2
#warning "Using MVars for hot atomic variables."
type HotVar a = MVar a
newHotVar x = do v <- newMVar; putMVar v x; return v
modifyHotVar v fn = modifyMVar v (return . fn)
modifyHotVar_ v fn = modifyMVar_ v (return . fn)
readHotVar = readMVar
writeHotVar v x = do swapMVar v x; return ()
instance Show (MVar a) where
show _ref = "<mvar>"
hotVarTransaction = error "Transactions not currently possible for MVars"
readHotVarRaw = readHotVar
writeHotVarRaw = writeHotVar
#elif HOTVAR == 3
#warning "Using TVars for hot atomic variables."
type HotVar a = TVar a
newHotVar = newTVarIO
modifyHotVar tv fn = atomically (do x <- readTVar tv
let (x2,b) = fn x
writeTVar tv x2
return b)
modifyHotVar_ tv fn = atomically (do x <- readTVar tv; writeTVar tv (fn x))
readHotVar x = atomically $ readTVar x
writeHotVar v x = atomically $ writeTVar v x
instance Show (TVar a) where
show ref = "<tvar>"
hotVarTransaction = atomically
readHotVarRaw = readTVar
writeHotVarRaw = writeTVar
#endif