module Engine.ReactiveBanana ( -- * Network setup allocateActuated , allocatePaused -- * Event utilities , eventHandler , debounce , reactimateDebugShow , timer -- * "Engine.Worker" interface -- ** From workers to networks , observe -- ** From networks to workers , pushWorkerInput , pushWorkerInputJust , pushWorkerOutput , pushWorkerOutputJust ) where import RIO import Engine.Worker qualified as Worker import GHC.Stack (withFrozenCallStack) import Reactive.Banana qualified as RB import Reactive.Banana.Frameworks qualified as RBF import Resource.Region qualified as Region import UnliftIO.Resource (ResourceT) import UnliftIO.Resource qualified as Resource import Engine.ReactiveBanana.Timer qualified as Timer -- * Network setup -- | Set up a network, run it and fire the started event before returning. allocateActuated :: MonadUnliftIO m => (UnliftIO m -> RB.Event () -> RBF.MomentIO ()) -> ResourceT m RBF.EventNetwork allocateActuated builder = do (ah, fire) <- liftIO RBF.newAddHandler network <- allocatePaused \unlift -> do started <- RBF.fromAddHandler ah builder unlift started liftIO do RBF.actuate network fire () pure network {- | Set up a network, passing a current context to the network-building function. The network would pause when leaving resource region. -} allocatePaused :: MonadUnliftIO m => (UnliftIO m -> RBF.MomentIO ()) -> ResourceT m RBF.EventNetwork allocatePaused builder = do unlift <- lift askUnliftIO fmap snd $ Resource.allocate (RBF.compile $ builder unlift) RBF.pause -- | Make an 'RB.Event' that can be fired by a callback registered in a current resource region. eventHandler :: (Resource.MonadResource m, MonadIO io) => ((a -> io ()) -> m Resource.ReleaseKey) -> ResourceT m (RBF.MomentIO (RB.Event a)) eventHandler action = do (addHandler, fire) <- liftIO RBF.newAddHandler Region.local_ $ action (liftIO . fire) pure $ RBF.fromAddHandler addHandler -- * Event utilities -- | An async process that will fire monotonic timestamp events and self-adjust for the delays induced by its handling. timer :: (MonadUnliftIO m) => Int -- ^ Timer interval in microseconds -> ResourceT m (RBF.MomentIO (RB.Event Double)) timer = Timer.every {-# DEPRECATED timer "Use Engine.ReactiveBanana.Timer.every" #-} {- | Filter out successive events with the same data. The output event will be delayed by one step due to 'RBF.reactimate' use. -} debounce :: Eq a => a -> RB.Event a -> RBF.MomentIO (RB.Event a) debounce initial spamUpdates = do (e, fire) <- RBF.newEvent oldVar <- newIORef initial RBF.reactimate $ spamUpdates <&> \new -> do changed <- atomicModifyIORef' oldVar \old -> (new, old /= new) when changed $ fire new pure e -- | Dump event contents to application debug log. reactimateDebugShow :: (Show a, MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) => (m () -> IO ()) -- ^ Unlift into application -> RB.Event a -- ^ Event to monitor -> RBF.MomentIO () reactimateDebugShow unlift = RBF.reactimate . fmap (unlift . withFrozenCallStack logDebug . displayShow) -- * "Engine.Worker" interface -- ** From workers to networks -- | Convert 'Worker.Var' updates into events. observe :: (MonadUnliftIO m) => Worker.Var a -> ResourceT m (RBF.MomentIO (RB.Event a)) observe var = do (addHandler, fire) <- liftIO RBF.newAddHandler initial <- readTVarIO var tracker <- async $ go fire (Worker.vVersion initial) Region.attachAsync tracker liftIO $ fire (Worker.vData initial) -- XXX: the network isn't compiled yet! pure $ RBF.fromAddHandler addHandler where go fire oldVersion = do Worker.Versioned{..} <- atomically do next <- readTVar var if Worker.vVersion next > oldVersion then pure next else retrySTM liftIO $ fire vData go fire vVersion -- ** From networks to workers -- | Set worker input to event contents. pushWorkerInput :: Worker.HasInput var => var -> RB.Event (Worker.GetInput var) -> RBF.MomentIO () pushWorkerInput p = RBF.reactimate . fmap (Worker.pushInput p . const) -- | Set worker input to event contents, if present. pushWorkerInputJust :: Worker.HasInput var => var -> RB.Event (Maybe (Worker.GetInput var)) -> RBF.MomentIO () pushWorkerInputJust p = RBF.reactimate . fmap (traverse_ $ Worker.pushInput p . const) -- | Set worker output to event contents. pushWorkerOutput :: Worker.HasOutput var => var -> RB.Event (Worker.GetOutput var) -> RBF.MomentIO () pushWorkerOutput p = RBF.reactimate . fmap (Worker.pushOutput p . const) -- | Set worker output to event contents, if present. pushWorkerOutputJust :: Worker.HasOutput var => var -> RB.Event (Maybe (Worker.GetOutput var)) -> RBF.MomentIO () pushWorkerOutputJust p = RBF.reactimate . fmap (traverse_ $ Worker.pushOutput p . const)