{-# options_haddock prune #-}
module Polysemy.Conc.Interpreter.Events where
import Control.Concurrent.Chan.Unagi.Bounded (InChan, OutChan, dupChan, newChan, readChan, tryWriteChan)
import Polysemy.Conc.Async (withAsync_)
import qualified Polysemy.Conc.Effect.Events as Events
import Polysemy.Conc.Effect.Events (Consume, EventResource (EventResource), Events)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Effect.Scoped (Scoped)
import Polysemy.Conc.Interpreter.Scoped (runScopedAs)
type ChanEvents e =
Events (OutChan e) e
type EventChan e =
EventResource (OutChan e)
type EventConsumer token e =
Scoped (EventResource token) (Consume e)
type ChanConsumer e =
Scoped (EventChan e) (Consume e)
interpretConsumeChan ::
∀ e r .
Member (Embed IO) r =>
EventChan e ->
InterpreterFor (Consume e) r
interpretConsumeChan :: forall e (r :: [(* -> *) -> * -> *]).
Member (Embed IO) r =>
EventChan e -> InterpreterFor (Consume e) r
interpretConsumeChan (EventResource OutChan e
chan) =
(forall (rInitial :: [(* -> *) -> * -> *]) x.
Consume e (Sem rInitial) x -> Sem r x)
-> Sem (Consume e : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
FirstOrder e "interpret" =>
(forall (rInitial :: [(* -> *) -> * -> *]) x.
e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
Consume e (Sem rInitial) x
Events.Consume ->
IO e -> Sem r e
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (OutChan e -> IO e
forall a. OutChan a -> IO a
readChan OutChan e
chan)
interpretEventsInChan ::
∀ e r .
Member (Embed IO) r =>
InChan e ->
InterpreterFor (Events (OutChan e) e) r
interpretEventsInChan :: forall e (r :: [(* -> *) -> * -> *]).
Member (Embed IO) r =>
InChan e -> InterpreterFor (Events (OutChan e) e) r
interpretEventsInChan InChan e
inChan =
(forall (rInitial :: [(* -> *) -> * -> *]) x.
Events (OutChan e) e (Sem rInitial) x -> Sem r x)
-> Sem (Events (OutChan e) e : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
FirstOrder e "interpret" =>
(forall (rInitial :: [(* -> *) -> * -> *]) x.
e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
Events.Publish e
e ->
Sem r Bool -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (InChan e -> e -> IO Bool
forall a. InChan a -> a -> IO Bool
tryWriteChan InChan e
inChan e
e))
interpretEventsChan ::
∀ e r .
Members [Resource, Race, Async, Embed IO] r =>
InterpretersFor [Events (OutChan e) e, ChanConsumer e] r
interpretEventsChan :: forall e (r :: [(* -> *) -> * -> *]).
Members '[Resource, Race, Async, Embed IO] r =>
InterpretersFor '[Events (OutChan e) e, ChanConsumer e] r
interpretEventsChan Sem
(Append
'[Events (OutChan e) e,
Scoped (EventResource (OutChan e)) (Consume e)]
r)
a
sem = do
(InChan e
inChan, OutChan e
outChan) <- IO (InChan e, OutChan e) -> Sem r (InChan e, OutChan e)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. Int -> IO (InChan a, OutChan a)
newChan @e Int
64)
Sem r Any -> Sem r a -> Sem r a
forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ (Sem r e -> Sem r Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO e -> Sem r e
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (OutChan e -> IO e
forall a. OutChan a -> IO a
readChan OutChan e
outChan))) do
Sem r (EventResource (OutChan e))
-> (EventResource (OutChan e) -> InterpreterFor (Consume e) r)
-> InterpreterFor
(Scoped (EventResource (OutChan e)) (Consume e)) r
forall resource (effect :: (* -> *) -> * -> *)
(r :: [(* -> *) -> * -> *]).
Sem r resource
-> (resource -> InterpreterFor effect r)
-> InterpreterFor (Scoped resource effect) r
runScopedAs (OutChan e -> EventResource (OutChan e)
forall resource. resource -> EventResource resource
EventResource (OutChan e -> EventResource (OutChan e))
-> Sem r (OutChan e) -> Sem r (EventResource (OutChan e))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (OutChan e) -> Sem r (OutChan e)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (InChan e -> IO (OutChan e)
forall a. InChan a -> IO (OutChan a)
dupChan InChan e
inChan)) EventResource (OutChan e) -> InterpreterFor (Consume e) r
forall e (r :: [(* -> *) -> * -> *]).
Member (Embed IO) r =>
EventChan e -> InterpreterFor (Consume e) r
interpretConsumeChan (InChan e
-> InterpreterFor
(Events (OutChan e) e)
(Scoped (EventResource (OutChan e)) (Consume e) : r)
forall e (r :: [(* -> *) -> * -> *]).
Member (Embed IO) r =>
InChan e -> InterpreterFor (Events (OutChan e) e) r
interpretEventsInChan InChan e
inChan Sem
(Events (OutChan e) e
: Scoped (EventResource (OutChan e)) (Consume e) : r)
a
Sem
(Append
'[Events (OutChan e) e,
Scoped (EventResource (OutChan e)) (Consume e)]
r)
a
sem)