-- |Description: Internal
module Polysemy.Log.Conc where

import Polysemy (interceptH, runT, subsume)
import Polysemy.Async (Async, async)
import Polysemy.Conc (Queue, Race, interpretQueueTBM)
import qualified Polysemy.Conc.Queue as Queue
import Polysemy.Conc.Queue.Result (resultToMaybe)
import Polysemy.Internal.Tactics (liftT)
import Polysemy.Resource (Resource)

import qualified Polysemy.Log.Data.DataLog as DataLog
import Polysemy.Log.Data.DataLog (DataLog (DataLog, Local))

-- |Intercept 'DataLog' for concurrent processing.
-- This does not send any action to the ultimate interpreter but writes all log messages to the provided queue.
-- 'Local' has to be handled here, otherwise this will not be called for actions in higher-order thunks.
interceptDataLogConcWithLocal ::
   msg r a .
  Members [Queue msg, DataLog msg] r =>
  (msg -> msg) ->
  Sem r a ->
  Sem r a
interceptDataLogConcWithLocal :: forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
(msg -> msg) -> Sem r a -> Sem r a
interceptDataLogConcWithLocal msg -> msg
context =
  (forall x (rInitial :: EffectRow).
 DataLog msg (Sem rInitial) x
 -> Tactical (DataLog msg) (Sem rInitial) r x)
-> Sem r a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Member e r =>
(forall x (rInitial :: EffectRow).
 e (Sem rInitial) x -> Tactical e (Sem rInitial) r x)
-> Sem r a -> Sem r a
interceptH \case
    DataLog msg
msg ->
      Sem r ()
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f ())
forall (m :: * -> *) (f :: * -> *) (r :: EffectRow)
       (e :: (* -> *) -> * -> *) a.
Functor f =>
Sem r a -> Sem (WithTactics e f m r) (f a)
liftT (msg -> Sem r ()
forall d (r :: EffectRow). Member (Queue d) r => d -> Sem r ()
Queue.write (msg -> msg
context msg
msg))
    Local msg -> msg
f Sem rInitial x
ma ->
      Sem r (f x)
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Sem r (f x)
 -> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x))
-> (Sem (DataLog msg : r) (f x) -> Sem r (f x))
-> Sem (DataLog msg : r) (f x)
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (msg -> msg) -> Sem r (f x) -> Sem r (f x)
forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
(msg -> msg) -> Sem r a -> Sem r a
interceptDataLogConcWithLocal (msg -> msg
f (msg -> msg) -> (msg -> msg) -> msg -> msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> msg
context) (Sem r (f x) -> Sem r (f x))
-> (Sem (DataLog msg : r) (f x) -> Sem r (f x))
-> Sem (DataLog msg : r) (f x)
-> Sem r (f x)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (DataLog msg : r) (f x) -> Sem r (f x)
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Member e r =>
Sem (e : r) a -> Sem r a
subsume (Sem (DataLog msg : r) (f x)
 -> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x))
-> Sem
     (WithTactics (DataLog msg) f (Sem rInitial) r)
     (Sem (DataLog msg : r) (f x))
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Sem rInitial x
-> Sem
     (WithTactics (DataLog msg) f (Sem rInitial) r)
     (Sem (DataLog msg : r) (f x))
forall (m :: * -> *) a (e :: (* -> *) -> * -> *) (f :: * -> *)
       (r :: EffectRow).
m a -> Sem (WithTactics e f m r) (Sem (e : r) (f a))
runT Sem rInitial x
ma
{-# inline interceptDataLogConcWithLocal #-}

-- |Intercept 'DataLog' for concurrent processing.
interceptDataLogConcWith ::
   msg r a .
  Members [Queue msg, DataLog msg] r =>
  Sem r a ->
  Sem r a
interceptDataLogConcWith :: forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
Sem r a -> Sem r a
interceptDataLogConcWith =
  forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
(msg -> msg) -> Sem r a -> Sem r a
interceptDataLogConcWithLocal @msg msg -> msg
forall a. a -> a
id
{-# inline interceptDataLogConcWith #-}

-- |Part of 'interceptDataLogConc'.
-- Loop as long as the proided queue is open and relay all dequeued messages to the ultimate interpreter, thereby
-- forcing the logging implementation to work in this thread.
loggerThread ::
   msg r .
  Members [Queue msg, DataLog msg] r =>
  Sem r ()
loggerThread :: forall msg (r :: EffectRow).
Members '[Queue msg, DataLog msg] r =>
Sem r ()
loggerThread = do
  Sem r ()
spin
  where
    spin :: Sem r ()
spin = do
      QueueResult msg
next <- Sem r (QueueResult msg)
forall d (r :: EffectRow).
Member (Queue d) r =>
Sem r (QueueResult d)
Queue.read
      Maybe msg -> (msg -> Sem r ()) -> Sem r ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (QueueResult msg -> Maybe msg
forall d. QueueResult d -> Maybe d
resultToMaybe QueueResult msg
next) \ msg
msg -> do
        forall a (r :: EffectRow). Member (DataLog a) r => a -> Sem r ()
DataLog.dataLog @msg msg
msg
        Sem r ()
spin

-- |Intercept 'DataLog' for concurrent processing.
-- Creates a queue and starts a worker thread.
-- All log messages received by the interceptor in 'interceptDataLogConcWithLocal' are written to the queue and sent to
-- the next 'DataLog' interpreter when the thread reads from the queue.
--
-- Since this is an interceptor, it will not remove the effect from the stack, but relay it to another interpreter:
--
-- @
-- interpretDataLogAtomic (interceptDataLogConc 64 (DataLog.dataLog "message"))
-- @
interceptDataLogConc ::
   msg r a .
  Members [DataLog msg, Resource, Async, Race, Embed IO] r =>
  -- |Queue size. When the queue fills up, the interceptor will block.
  Int ->
  Sem r a ->
  Sem r a
interceptDataLogConc :: forall msg (r :: EffectRow) a.
Members '[DataLog msg, Resource, Async, Race, Embed IO] r =>
Int -> Sem r a -> Sem r a
interceptDataLogConc Int
maxQueued Sem r a
sem = do
  forall d (r :: EffectRow).
Members '[Resource, Race, Embed IO] r =>
Int -> InterpreterFor (Queue d) r
interpretQueueTBM @msg Int
maxQueued do
    !Async (Maybe ())
_ <- Sem (Queue msg : r) () -> Sem (Queue msg : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
async (forall msg (r :: EffectRow).
Members '[Queue msg, DataLog msg] r =>
Sem r ()
loggerThread @msg)
    forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
Sem r a -> Sem r a
interceptDataLogConcWith @msg (Sem r a -> Sem (Queue msg : r) a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise Sem r a
sem)
{-# inline interceptDataLogConc #-}