-- |Description: Queue Interpreters for 'TBMQueue'
module Polysemy.Conc.Interpreter.Queue.TBM where

import Control.Concurrent.STM.TBMQueue (
  TBMQueue,
  closeTBMQueue,
  isClosedTBMQueue,
  newTBMQueueIO,
  peekTBMQueue,
  readTBMQueue,
  tryPeekTBMQueue,
  tryReadTBMQueue,
  tryWriteTBMQueue,
  writeTBMQueue,
  )
import Polysemy.Resource (Resource, bracket)

import qualified Polysemy.Conc.Effect.Queue as Queue
import Polysemy.Conc.Effect.Queue (Queue)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Queue.Result (closedBoolResult, closedNaResult, closedResult)
import Polysemy.Conc.Queue.Timeout (withTimeout)

-- |Interpret 'Queue' with a 'TBMQueue'.
--
-- This variant expects an allocated queue as an argument.
interpretQueueTBMWith ::
   d r .
  Members [Race, Embed IO] r =>
  TBMQueue d ->
  InterpreterFor (Queue d) r
interpretQueueTBMWith :: TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue d
queue =
  (forall (rInitial :: EffectRow) x.
 Queue d (Sem rInitial) x -> Sem r x)
-> Sem (Queue d : r) a -> Sem r a
forall (e :: Effect) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Queue d (Sem rInitial) x
Queue.Read ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
closedResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue)
    Queue d (Sem rInitial) x
Queue.TryRead ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Maybe (Maybe d) -> QueueResult d
forall d. Maybe (Maybe d) -> QueueResult d
closedNaResult (Maybe (Maybe d) -> QueueResult d)
-> STM (Maybe (Maybe d)) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe (Maybe d))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryReadTBMQueue TBMQueue d
queue)
    Queue.ReadTimeout timeout ->
      t -> STM (Maybe d) -> Sem r (QueueResult d)
forall t (r :: EffectRow) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue)
    Queue d (Sem rInitial) x
Queue.Peek ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
closedResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
peekTBMQueue TBMQueue d
queue)
    Queue d (Sem rInitial) x
Queue.TryPeek ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Maybe (Maybe d) -> QueueResult d
forall d. Maybe (Maybe d) -> QueueResult d
closedNaResult (Maybe (Maybe d) -> QueueResult d)
-> STM (Maybe (Maybe d)) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe (Maybe d))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryPeekTBMQueue TBMQueue d
queue)
    Queue.Write d ->
      STM () -> Sem r ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue d -> d -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue d
d)
    Queue.TryWrite d ->
      STM (QueueResult ()) -> Sem r (QueueResult ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Maybe Bool -> QueueResult ()
closedBoolResult (Maybe Bool -> QueueResult ())
-> STM (Maybe Bool) -> STM (QueueResult ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> d -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue d
queue d
d)
    Queue.WriteTimeout timeout d ->
      t -> STM (Maybe ()) -> Sem r (QueueResult ())
forall t (r :: EffectRow) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout do
        STM Bool -> STM (Maybe ()) -> STM (Maybe ()) -> STM (Maybe ())
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (TBMQueue d -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue d
queue) (Maybe () -> STM (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing) (() -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> STM () -> STM (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> d -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue d
d)
    Queue d (Sem rInitial) x
Queue.Closed ->
      STM Bool -> Sem r Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue d -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue d
queue)
    Queue d (Sem rInitial) x
Queue.Close ->
      STM () -> Sem r ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue d -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue d
queue)
{-# inline interpretQueueTBMWith #-}

withTBMQueue ::
   d r a .
  Members [Resource, Embed IO] r =>
  Int ->
  (TBMQueue d -> Sem r a) ->
  Sem r a
withTBMQueue :: Int -> (TBMQueue d -> Sem r a) -> Sem r a
withTBMQueue Int
maxQueued =
  Sem r (TBMQueue d)
-> (TBMQueue d -> Sem r ()) -> (TBMQueue d -> Sem r a) -> Sem r a
forall (r :: EffectRow) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket (IO (TBMQueue d) -> Sem r (TBMQueue d)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Int -> IO (TBMQueue d)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
maxQueued)) (STM () -> Sem r ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> Sem r ())
-> (TBMQueue d -> STM ()) -> TBMQueue d -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue d -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)

-- |Interpret 'Queue' with a 'TBMQueue'.
interpretQueueTBM ::
   d r .
  Members [Resource, Race, Embed IO] r =>
  -- |Buffer size
  Int ->
  InterpreterFor (Queue d) r
interpretQueueTBM :: Int -> InterpreterFor (Queue d) r
interpretQueueTBM Int
maxQueued Sem (Queue d : r) a
sem = do
  Int -> (TBMQueue d -> Sem r a) -> Sem r a
forall d (r :: EffectRow) a.
Members '[Resource, Embed IO] r =>
Int -> (TBMQueue d -> Sem r a) -> Sem r a
withTBMQueue Int
maxQueued \ TBMQueue d
queue ->
    TBMQueue d -> Sem (Queue d : r) a -> Sem r a
forall d (r :: EffectRow).
Members '[Race, Embed IO] r =>
TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue d
queue Sem (Queue d : r) a
sem
{-# inline interpretQueueTBM #-}