module Control.Concurrent.CHS (
CHS,
Chan,
newChan,
readChan,
writeChan,
synchronize,
initChs,
testCHS, test1, test2, test3, step
) where
import Unsafe.Coerce(unsafeCoerce)
import System.IO.Unsafe (unsafePerformIO)
import Control.Monad
import Control.Applicative
import Control.Concurrent (MVar, newMVar, putMVar, takeMVar, forkIO)
import Control.Concurrent.STM hiding (orElse)
import Data.Unique
import Control.Monad.Prompt
import qualified Data.List.Zipper as Z
initChs :: IO ()
newtype CHS a = CHS { runCHS :: Prompt CHSPrompt a }
deriving (Functor, Applicative)
synchronize :: CHS a -> IO a
data Chan a = Chan Unique deriving Eq
newChan :: IO (Chan a)
readChan :: Chan a -> CHS a
writeChan :: Chan a -> a -> CHS ()
readChan c = CHS (prompt $ Read c)
writeChan c a = CHS (prompt $ Write a c)
synchronize computation = do
v <- newTVarIO Nothing
let genericComp = viewPrompt (CHSRes v <$> computation)
atomically $ modifyTVar chsBlocked (genericComp :)
atomically $ fromJustM $ readTVar v
instance Monad CHS where
return = CHS . return
fail _ = CHS (prompt Fail)
m >>= f = CHS $ runCHS m >>= runCHS . f
instance MonadPlus CHS where
mzero = CHS (prompt Fail)
mplus a b = CHS (prompt (Choice a b))
instance Alternative CHS where
empty = mzero
(<|>) = mplus
instance Show (Chan a) where
show (Chan u) = "Chan " ++ show (hashUnique u)
data CHSState a where
Complete :: a -> CHSState a
BlockedRead :: Chan a -> (a -> CHSState b) -> CHSState b
BlockedWrite :: a -> Chan a -> CHSState b -> CHSState b
OrElse :: CHSState a -> CHSState a -> CHSState a
Failed :: CHSState a
data CHSPrompt a where
Fail :: CHSPrompt a
Read :: Chan a -> CHSPrompt a
Write :: a -> Chan a -> CHSPrompt ()
Choice :: CHS a -> CHS a -> CHSPrompt a
viewPrompt :: CHS a -> CHSState a
viewPrompt = runPromptC ret prm . runCHS where
ret = Complete
prm :: CHSPrompt v -> (v -> CHSState a) -> CHSState a
prm Fail _ = Failed
prm (Read c) k = BlockedRead c k
prm (Write a c) k = BlockedWrite a c (k ())
prm (Choice a b) k = OrElse (viewPrompt a `bindCHS` k) (viewPrompt b `bindCHS` k)
instance Show a => Show (CHSState a) where
show (Complete a) = "Complete " ++ show a
show (BlockedRead c _) = "BlockedRead " ++ show c
show (BlockedWrite _ c _) = "BlockedWrite " ++ show c
show Failed = "Failed"
show (OrElse a b) = show a ++ " `OrElse` " ++ show b
bindCHS :: CHSState a -> (a -> CHSState b) -> CHSState b
bindCHS (Complete a) f = f a
bindCHS (BlockedRead c k) f = BlockedRead c $ \a -> (k a `bindCHS` f)
bindCHS (BlockedWrite a c k) f = BlockedWrite a c (k `bindCHS` f)
bindCHS Failed _ = Failed
bindCHS (OrElse a b) f = OrElse (a `bindCHS` f) (b `bindCHS` f)
select :: [a] -> [(a,Z.Zipper a)]
select xs = select' (Z.fromList xs) where
select' z | Z.endp z = []
| otherwise = (Z.cursor z, Z.delete z) : select' (Z.right z)
data TypeEq a b where Refl :: TypeEq a a
stepSynchronize :: [CHSState a] -> [[CHSState a]]
stepSynchronize [] = []
stepSynchronize (Failed : _) = []
stepSynchronize (Complete a : xs) = do
xs' <- stepSynchronize xs
return (Complete a : xs')
stepSynchronize (BlockedRead c k : xs) = mplus
(do (BlockedWrite a c2 k2, z) <- select xs
Refl <- chanEq c c2
return (k a : (Z.toList $ Z.insert k2 z))
)
(do xs' <- stepSynchronize xs
return (BlockedRead c k : xs'))
stepSynchronize (BlockedWrite a c k : xs) = mplus
(do (BlockedRead c2 k2, z) <- select xs
Refl <- chanEq c c2
return (k : (Z.toList $ Z.insert (k2 a) z))
)
(do xs' <- stepSynchronize xs
return (BlockedWrite a c k : xs'))
stepSynchronize (OrElse a b : xs) = [a : xs, b: xs]
splitSets :: [a] -> [([a], [a])]
splitSets [] = [([], [])]
splitSets (x:xs) = [ (l, x:r) | (l,r) <- splitSets xs ]
++ [ (x:l, r) | (l,r) <- splitSets xs ]
trySynchronize :: [CHSState a] -> [([a], [CHSState a])]
trySynchronize gang = do
(g, r) <- splitSets gang
guard (not $ null g)
res <- runSynch g
return (res, r)
runSynch :: [CHSState a]-> [[a]]
runSynch gang | complete gang = return [ x | Complete x <- gang ]
| otherwise = stepSynchronize gang >>= runSynch
where
complete g = all isComplete g
isComplete (Complete _) = True
isComplete _ = False
data CHSRes where
CHSRes :: TVar (Maybe a) -> a -> CHSRes
writeResult :: CHSRes -> STM ()
writeResult (CHSRes v a) = writeTVar v (Just a)
chsThread :: IO ()
chsThread = forever $ atomically $ do
gang <- readTVar chsBlocked
case (trySynchronize gang) of
[] -> retry
((results, gang') : _) -> do
writeTVar chsBlocked gang'
mapM_ writeResult results
initChs = do
started <- takeMVar chsInited
when (not started) $ do
forkIO chsThread
return ()
putMVar chsInited True
fromJustM :: MonadPlus m => m (Maybe a) -> m a
fromJustM m = do
x <- m
case x of (Just a) -> return a
Nothing -> mzero
modifyTVar v f = do
x <- readTVar v
writeTVar v (f x)
newChan = do
u <- newUnique
return (Chan u)
chanEq :: MonadPlus m => Chan a -> Chan b -> m (TypeEq a b)
chanEq (Chan a) (Chan b)
| a == b = return (unsafeCoerce Refl)
| otherwise = mzero
chsBlocked :: TVar [CHSState CHSRes]
chsBlocked = unsafePerformIO $ newTVarIO []
chsInited :: MVar Bool
chsInited = unsafePerformIO $ newMVar False
testCh :: Chan Int
testCh = unsafePerformIO newChan
step m = do x <- m; stepSynchronize x
test1, test2, test3 :: Chan Int -> CHS Int
test1 c = do
x <- readChan c
if x == 0 then mzero else do
writeChan c (100 `div` x)
return x
test2 c = do
writeChan c 0 `mplus` writeChan c 5
readChan c
test3 _ = return 100
testGang :: [CHSState Int]
testGang = map (viewPrompt . ($ testCh)) [test3, test1, test2, test1, test1, test2]
testCHS :: IO ()
testCHS = do
initChs
c <- newChan
forkIO $ synchronize (test1 c) >>= print
synchronize (test2 c) >>= print
return ()