module Interpreter.Lib.Concurrency where import Control.Concurrent import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TSem import Control.Concurrent.STM.TMVar import Control.Exception (SomeException, catch, toException) import Control.Monad.IO.Class import Control.Monad.IO.Unlift import Control.Monad.State.Strict import Data.Coerce import Interpreter.Common import Interpreter.Interpreter builtInLaunchThread :: BuiltInFnWithDoc '[ '("callback_thread", Callback), '("callback_arg", Maybe Value) ] builtInLaunchThread ((coerce -> (processCb :: Callback)) :> (coerce -> (mthreadArg)) :> EmptyArgs) = withRunInIO $ \runInIO -> do liftIO $ do resultRef <- newEmptyTMVarIO threadId <- forkIO $ flip catch (asynExHandler resultRef) $ do r <- runInIO (withStateClone $ evaluateCallback processCb $ maybe [] (\x -> [x]) mthreadArg) atomically $ putTMVar resultRef $ case r of Just r' -> Right r' Nothing -> Left (toException MissingProcedureReturn) pure $ Just $ ThreadRef $ ThreadInfo threadId resultRef where asynExHandler :: TMVar (Either SomeException Value) -> SomeException -> IO () asynExHandler ref e = atomically $ putTMVar ref $ Left $ toException e builtInKillThread :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInKillThread ((coerce -> (ThreadInfo threadId _)) :> EmptyArgs) = do liftIO $ killThread threadId pure Nothing builtInAwait :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInAwait ((coerce -> (ThreadInfo _ pmvar)) :> EmptyArgs) = do void $ liftIO $ atomically $ readTMVar pmvar pure Nothing builtInAwaitResult :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInAwaitResult ((coerce -> (ThreadInfo _ pmvar)) :> EmptyArgs) = (liftIO $ atomically $ readTMVar pmvar) >>= \case Right x -> pure $ Just x Left e -> throwErr e builtInNewChannel :: BuiltInFnWithDoc '[] builtInNewChannel _ = (pure . Channel . ChannelRef) <$> liftIO newTChanIO builtInWriteChannel :: BuiltInFnWithDoc '[ '("channel_ref", ChannelRef), '("value", Value)] builtInWriteChannel ((coerce -> (ChannelRef chan)) :> (coerce -> val) :> _) = do liftIO $ atomically $ writeTChan chan val pure Nothing builtInReadChannel :: BuiltInFnWithDoc '[ '("channel_ref", ChannelRef)] builtInReadChannel ((coerce -> (ChannelRef chan)) :> _) = Just <$> (liftIO $ atomically $ readTChan chan) builtInIsChannelEmpty :: BuiltInFnWithDoc '[ '("channel_ref", ChannelRef)] builtInIsChannelEmpty ((coerce -> (ChannelRef chan)) :> _) = (Just . BoolValue) <$> (liftIO $ atomically $ isEmptyTChan chan) builtInNewRef :: BuiltInFnWithDoc '[ '("init_value", Value) ] builtInNewRef ((coerce -> (v :: Value)) :> EmptyArgs) = do (ref, sem) <- liftIO $ do r <- newTMVarIO v s <- atomically (newTSem 1) pure (r, s) pure $ Just $ Ref $ MutableRef ref sem builtInWriteRef :: BuiltInFnWithDoc '[ '("ref", MutableRef), '("new_value", Value) ] builtInWriteRef ((coerce -> (MutableRef ref _)) :> (coerce -> v) :> EmptyArgs) = do void $ liftIO $ atomically $ swapTMVar ref v pure Nothing builtInModifyRef :: BuiltInFnWithDoc '[ '("ref", MutableRef), '("callback", Callback) ] builtInModifyRef ((coerce -> (MutableRef ref sem)) :> (coerce -> (callback :: Callback)) :> EmptyArgs) = do v <- liftIO $ atomically $ do waitTSem sem readTMVar ref evaluateCallback callback [v] >>= \case Nothing -> throwErr MissingProcedureReturn Just r -> liftIO $ atomically $ do _ <- swapTMVar ref r signalTSem sem pure Nothing builtInReadRef :: BuiltInFnWithDoc '[ '("ref", MutableRef) ] builtInReadRef ((coerce -> (MutableRef ref _)) :> EmptyArgs) = do v <- liftIO $ atomically $ readTMVar ref pure $ Just v builtInGeneratorNext :: BuiltInFnWithDoc '[ '("generator", GeneratorChannels) ] builtInGeneratorNext ((coerce -> genChans) :> EmptyArgs) = do generatorNext genChans >>= \case Just v' -> pure $ Just v' Nothing -> throwErr GeneratorExhausted