{-# options_haddock prune #-}
-- |Description: Events/Consume Interpreters, Internal
module Polysemy.Conc.Interpreter.Events where

import Control.Concurrent.Chan.Unagi.Bounded (InChan, OutChan, dupChan, newChan, readChan, tryWriteChan)
import Polysemy (InterpretersFor)
import Polysemy.Async (Async)
import Polysemy.Resource (Resource)

import Polysemy.Conc.Async (withAsync_)
import qualified Polysemy.Conc.Effect.Events as Events
import Polysemy.Conc.Effect.Events (Consume, EventToken (EventToken), Events)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Effect.Scoped (Scoped)
import Polysemy.Conc.Interpreter.Scoped (runScopedAs)

-- |Convenience alias for the default 'Events' that uses an 'OutChan'.
type ChanEvents e =
  Events (OutChan e) e

-- |Convenience alias for the default 'EventToken' that uses an 'OutChan'.
type EventChan e =
  EventToken (OutChan e)

-- |Convenience alias for the consumer effect.
type EventConsumer token e =
  Scoped (EventToken token) (Consume e)

-- |Convenience alias for the consumer effect using the default implementation.
type ChanConsumer e =
  Scoped (EventChan e) (Consume e)

-- |Interpret 'Consume' by reading from an 'OutChan'.
-- Used internally by 'interpretEventsChan', not safe to use directly.
interpretConsumeChan ::
   e r .
  Member (Embed IO) r =>
  EventChan e ->
  InterpreterFor (Consume e) r
interpretConsumeChan :: EventChan e -> InterpreterFor (Consume e) r
interpretConsumeChan (EventToken OutChan e
chan) =
  (forall (rInitial :: EffectRow) x.
 Consume e (Sem rInitial) x -> Sem r x)
-> Sem (Consume e : r) a -> Sem r a
forall (e :: Effect) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Consume e (Sem rInitial) x
Events.Consume ->
      IO e -> Sem r e
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (OutChan e -> IO e
forall a. OutChan a -> IO a
readChan OutChan e
chan)

-- |Interpret 'Events' by writing to an 'InChan'.
-- Used internally by 'interpretEventsChan', not safe to use directly.
-- When the channel queue is full, this silently discards events.
interpretEventsInChan ::
   e r .
  Member (Embed IO) r =>
  InChan e ->
  InterpreterFor (Events (OutChan e) e) r
interpretEventsInChan :: InChan e -> InterpreterFor (Events (OutChan e) e) r
interpretEventsInChan InChan e
inChan =
  (forall (rInitial :: EffectRow) x.
 Events (OutChan e) e (Sem rInitial) x -> Sem r x)
-> Sem (Events (OutChan e) e : r) a -> Sem r a
forall (e :: Effect) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Events.Publish e ->
      Sem r Bool -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (InChan e -> e -> IO Bool
forall a. InChan a -> a -> IO Bool
tryWriteChan InChan e
inChan e
e))

-- |Interpret 'Events' and 'Consume' together by connecting them to the two ends of an unagi channel.
-- 'Consume' is only interpreted in a 'Scoped' manner, ensuring that a new duplicate of the channel is created so that
-- all consumers see all events (from the moment they are connected).
--
-- This should be used in conjunction with 'Polysemy.Conc.subscribe':
--
-- @
-- interpretEventsChan do
--   async $ subscribe do
--     putStrLn =<< consume
--   publish "hello"
-- @
--
-- Whenever 'Polysemy.Conc.subscribe' creates a new scope, this interpreter calls 'dupChan' and passes the
-- duplicate to 'interpretConsumeChan'.
interpretEventsChan ::
   e r .
  Members [Resource, Race, Async, Embed IO] r =>
  InterpretersFor [Events (OutChan e) e, ChanConsumer e] r
interpretEventsChan :: InterpretersFor '[Events (OutChan e) e, ChanConsumer e] r
interpretEventsChan Sem (Append '[Events (OutChan e) e, ChanConsumer e] r) a
sem = do
  (InChan e
inChan, OutChan e
outChan) <- IO (InChan e, OutChan e) -> Sem r (InChan e, OutChan e)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Int -> IO (InChan e, OutChan e)
forall a. Int -> IO (InChan a, OutChan a)
newChan @e Int
64)
  Sem r Any -> Sem r a -> Sem r a
forall (r :: EffectRow) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ (Sem r e -> Sem r Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO e -> Sem r e
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (OutChan e -> IO e
forall a. OutChan a -> IO a
readChan OutChan e
outChan))) do
    Sem r (EventToken (OutChan e))
-> (EventToken (OutChan e) -> InterpreterFor (Consume e) r)
-> Sem (ChanConsumer e : r) a
-> Sem r a
forall resource (effect :: Effect) (r :: EffectRow).
Sem r resource
-> (resource -> InterpreterFor effect r)
-> InterpreterFor (Scoped resource effect) r
runScopedAs (OutChan e -> EventToken (OutChan e)
forall token. token -> EventToken token
EventToken (OutChan e -> EventToken (OutChan e))
-> Sem r (OutChan e) -> Sem r (EventToken (OutChan e))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (OutChan e) -> Sem r (OutChan e)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (InChan e -> IO (OutChan e)
forall a. InChan a -> IO (OutChan a)
dupChan InChan e
inChan)) EventToken (OutChan e) -> InterpreterFor (Consume e) r
forall e (r :: EffectRow).
Member (Embed IO) r =>
EventChan e -> InterpreterFor (Consume e) r
interpretConsumeChan (InChan e
-> Sem (Events (OutChan e) e : ChanConsumer e : r) a
-> Sem (ChanConsumer e : r) a
forall e (r :: EffectRow).
Member (Embed IO) r =>
InChan e -> InterpreterFor (Events (OutChan e) e) r
interpretEventsInChan InChan e
inChan Sem (Events (OutChan e) e : ChanConsumer e : r) a
Sem (Append '[Events (OutChan e) e, ChanConsumer e] r) a
sem)