-- |Description: Internal
module Polysemy.Log.Conc where

import qualified Control.Concurrent.Async as Base
import qualified Polysemy.Conc as Conc
import Polysemy.Conc (Queue, Race, interpretQueueTBM)
import qualified Polysemy.Conc.Queue as Queue
import Polysemy.Conc.Queue.Result (resultToMaybe)
import Polysemy.Internal.Tactics (liftT)
import Polysemy.Time (Seconds (Seconds))

import qualified Polysemy.Log.Data.DataLog as DataLog
import Polysemy.Log.Data.DataLog (DataLog (DataLog, Local))

-- |Intercept 'DataLog' for concurrent processing.
-- This does not send any action to the ultimate interpreter but writes all log messages to the provided queue.
-- 'Local' has to be handled here, otherwise this will not be called for actions in higher-order thunks.
interceptDataLogConcWithLocal ::
   msg r a .
  Members [Queue msg, DataLog msg] r =>
  (msg -> msg) ->
  Sem r a ->
  Sem r a
interceptDataLogConcWithLocal :: forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
(msg -> msg) -> Sem r a -> Sem r a
interceptDataLogConcWithLocal msg -> msg
context =
  (forall x (rInitial :: EffectRow).
 DataLog msg (Sem rInitial) x
 -> Tactical (DataLog msg) (Sem rInitial) r x)
-> Sem r a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Member e r =>
(forall x (rInitial :: EffectRow).
 e (Sem rInitial) x -> Tactical e (Sem rInitial) r x)
-> Sem r a -> Sem r a
interceptH \case
    DataLog msg
msg ->
      Sem r ()
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f ())
forall (m :: * -> *) (f :: * -> *) (r :: EffectRow)
       (e :: (* -> *) -> * -> *) a.
Functor f =>
Sem r a -> Sem (WithTactics e f m r) (f a)
liftT (msg -> Sem r ()
forall d (r :: EffectRow). Member (Queue d) r => d -> Sem r ()
Queue.write (msg -> msg
context msg
msg))
    Local msg -> msg
f Sem rInitial x
ma ->
      Sem r (f x)
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Sem r (f x)
 -> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x))
-> (Sem (DataLog msg : r) (f x) -> Sem r (f x))
-> Sem (DataLog msg : r) (f x)
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (msg -> msg) -> Sem r (f x) -> Sem r (f x)
forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
(msg -> msg) -> Sem r a -> Sem r a
interceptDataLogConcWithLocal (msg -> msg
f (msg -> msg) -> (msg -> msg) -> msg -> msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> msg
context) (Sem r (f x) -> Sem r (f x))
-> (Sem (DataLog msg : r) (f x) -> Sem r (f x))
-> Sem (DataLog msg : r) (f x)
-> Sem r (f x)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (DataLog msg : r) (f x) -> Sem r (f x)
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Member e r =>
Sem (e : r) a -> Sem r a
subsume (Sem (DataLog msg : r) (f x)
 -> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x))
-> Sem
     (WithTactics (DataLog msg) f (Sem rInitial) r)
     (Sem (DataLog msg : r) (f x))
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Sem rInitial x
-> Sem
     (WithTactics (DataLog msg) f (Sem rInitial) r)
     (Sem (DataLog msg : r) (f x))
forall (m :: * -> *) a (e :: (* -> *) -> * -> *) (f :: * -> *)
       (r :: EffectRow).
m a -> Sem (WithTactics e f m r) (Sem (e : r) (f a))
runT Sem rInitial x
ma
{-# inline interceptDataLogConcWithLocal #-}

-- |Intercept 'DataLog' for concurrent processing.
interceptDataLogConcWith ::
   msg r a .
  Members [Queue msg, DataLog msg] r =>
  Sem r a ->
  Sem r a
interceptDataLogConcWith :: forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
Sem r a -> Sem r a
interceptDataLogConcWith =
  forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
(msg -> msg) -> Sem r a -> Sem r a
interceptDataLogConcWithLocal @msg msg -> msg
forall a. a -> a
id
{-# inline interceptDataLogConcWith #-}

-- |Part of 'interceptDataLogConc'.
-- Loop as long as the proided queue is open and relay all dequeued messages to the ultimate interpreter, thereby
-- forcing the logging implementation to work in this thread.
loggerThread ::
   msg r .
  Members [Queue msg, DataLog msg] r =>
  Sem r ()
loggerThread :: forall msg (r :: EffectRow).
Members '[Queue msg, DataLog msg] r =>
Sem r ()
loggerThread = do
  Sem r ()
spin
  where
    spin :: Sem r ()
spin = do
      QueueResult msg
next <- Sem r (QueueResult msg)
forall d (r :: EffectRow).
Member (Queue d) r =>
Sem r (QueueResult d)
Queue.read
      Maybe msg -> (msg -> Sem r ()) -> Sem r ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (QueueResult msg -> Maybe msg
forall d. QueueResult d -> Maybe d
resultToMaybe QueueResult msg
next) \ msg
msg -> do
        forall a (r :: EffectRow). Member (DataLog a) r => a -> Sem r ()
DataLog.dataLog @msg msg
msg
        Sem r ()
spin

-- |Close the concurrent logger's queue and wait for one second to allow it to process any messages that have been
-- queued.
finalize ::
   msg r .
  Members [Queue msg, Resource, Async, Race, Embed IO] r =>
  Base.Async (Maybe ()) ->
  Sem r ()
finalize :: forall msg (r :: EffectRow).
Members '[Queue msg, Resource, Async, Race, Embed IO] r =>
Async (Maybe ()) -> Sem r ()
finalize Async (Maybe ())
handle =
  Seconds -> Sem r () -> Sem r ()
forall u (r :: EffectRow).
(TimeUnit u, Member Race r) =>
u -> Sem r () -> Sem r ()
Conc.timeoutU (Int64 -> Seconds
Seconds Int64
1) do
    forall d (r :: EffectRow). Member (Queue d) r => Sem r ()
Queue.close @msg
    Sem r (Maybe ()) -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Async (Maybe ()) -> Sem r (Maybe ())
forall (r :: EffectRow) a. Member Async r => Async a -> Sem r a
await Async (Maybe ())
handle)

-- |Intercept 'DataLog' for concurrent processing.
-- Creates a queue and starts a worker thread.
-- All log messages received by the interceptor in 'interceptDataLogConcWithLocal' are written to the queue and sent to
-- the next 'DataLog' interpreter when the thread reads from the queue.
--
-- Since this is an interceptor, it will not remove the effect from the stack, but relay it to another interpreter:
--
-- @
-- interpretDataLogAtomic (interceptDataLogConc 64 (DataLog.dataLog "message"))
-- @
interceptDataLogConc ::
   msg r a .
  Members [DataLog msg, Resource, Async, Race, Embed IO] r =>
  -- |Queue size. When the queue fills up, the interceptor will block.
  Int ->
  Sem r a ->
  Sem r a
interceptDataLogConc :: forall msg (r :: EffectRow) a.
Members '[DataLog msg, Resource, Async, Race, Embed IO] r =>
Int -> Sem r a -> Sem r a
interceptDataLogConc Int
maxQueued Sem r a
sem = do
  forall d (r :: EffectRow).
Members '[Resource, Race, Embed IO] r =>
Int -> InterpreterFor (Queue d) r
interpretQueueTBM @msg Int
maxQueued do
    !Async (Maybe ())
handle <- Sem (Queue msg : r) () -> Sem (Queue msg : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
async (forall msg (r :: EffectRow).
Members '[Queue msg, DataLog msg] r =>
Sem r ()
loggerThread @msg)
    Sem (Queue msg : r) a
-> Sem (Queue msg : r) () -> Sem (Queue msg : r) a
forall (r :: EffectRow) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
finally (forall msg (r :: EffectRow) a.
Members '[Queue msg, DataLog msg] r =>
Sem r a -> Sem r a
interceptDataLogConcWith @msg (Sem r a -> Sem (Queue msg : r) a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise Sem r a
sem)) (forall msg (r :: EffectRow).
Members '[Queue msg, Resource, Async, Race, Embed IO] r =>
Async (Maybe ()) -> Sem r ()
finalize @msg Async (Maybe ())
handle)
{-# inline interceptDataLogConc #-}