{-# options_haddock prune #-}

-- |Description: Queue Interpreters for 'TBMQueue'
module Polysemy.Conc.Interpreter.Queue.TBM where

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMQueue (
  TBMQueue,
  closeTBMQueue,
  isClosedTBMQueue,
  newTBMQueueIO,
  peekTBMQueue,
  readTBMQueue,
  tryPeekTBMQueue,
  tryReadTBMQueue,
  tryWriteTBMQueue,
  writeTBMQueue,
  )

import qualified Polysemy.Conc.Effect.Queue as Queue
import Polysemy.Conc.Effect.Queue (Queue)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Queue.Result (closedBoolResult, closedNaResult, closedResult)
import Polysemy.Conc.Queue.Timeout (withTimeout)

-- |Interpret 'Queue' with a 'TBMQueue'.
--
-- This variant expects an allocated queue as an argument.
interpretQueueTBMWith ::
   d r .
  Members [Race, Embed IO] r =>
  TBMQueue d ->
  InterpreterFor (Queue d) r
interpretQueueTBMWith :: forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue d
queue =
  (forall (rInitial :: [(* -> *) -> * -> *]) x.
 Queue d (Sem rInitial) x -> Sem r x)
-> Sem (Queue d : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
FirstOrder e "interpret" =>
(forall (rInitial :: [(* -> *) -> * -> *]) x.
 e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Queue d (Sem rInitial) x
Queue.Read ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
closedResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue))
    Queue d (Sem rInitial) x
Queue.TryRead ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe (Maybe d) -> QueueResult d
forall d. Maybe (Maybe d) -> QueueResult d
closedNaResult (Maybe (Maybe d) -> QueueResult d)
-> STM (Maybe (Maybe d)) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe (Maybe d))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryReadTBMQueue TBMQueue d
queue))
    Queue.ReadTimeout t
timeout ->
      t -> STM (Maybe d) -> Sem r (QueueResult d)
forall t (r :: [(* -> *) -> * -> *]) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue)
    Queue d (Sem rInitial) x
Queue.Peek ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
closedResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
peekTBMQueue TBMQueue d
queue))
    Queue d (Sem rInitial) x
Queue.TryPeek ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe (Maybe d) -> QueueResult d
forall d. Maybe (Maybe d) -> QueueResult d
closedNaResult (Maybe (Maybe d) -> QueueResult d)
-> STM (Maybe (Maybe d)) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe (Maybe d))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryPeekTBMQueue TBMQueue d
queue))
    Queue.Write d
d ->
      IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBMQueue d -> d -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue d
d))
    Queue.TryWrite d
d ->
      IO (QueueResult ()) -> Sem r (QueueResult ())
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult ()) -> IO (QueueResult ())
forall a. STM a -> IO a
atomically (Maybe Bool -> QueueResult ()
closedBoolResult (Maybe Bool -> QueueResult ())
-> STM (Maybe Bool) -> STM (QueueResult ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> d -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue d
queue d
d))
    Queue.WriteTimeout t
timeout d
d ->
      t -> STM (Maybe ()) -> Sem r (QueueResult ())
forall t (r :: [(* -> *) -> * -> *]) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout do
        STM Bool -> STM (Maybe ()) -> STM (Maybe ()) -> STM (Maybe ())
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (TBMQueue d -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue d
queue) (Maybe () -> STM (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing) (() -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> STM () -> STM (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> d -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue d
d)
    Queue d (Sem rInitial) x
Queue.Closed ->
      IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TBMQueue d -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue d
queue))
    Queue d (Sem rInitial) x
Queue.Close ->
      IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBMQueue d -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue d
queue))
{-# inline interpretQueueTBMWith #-}

withTBMQueue ::
   d r a .
  Members [Resource, Embed IO] r =>
  Int ->
  (TBMQueue d -> Sem r a) ->
  Sem r a
withTBMQueue :: forall d (r :: [(* -> *) -> * -> *]) a.
Members '[Resource, Embed IO] r =>
Int -> (TBMQueue d -> Sem r a) -> Sem r a
withTBMQueue Int
maxQueued =
  Sem r (TBMQueue d)
-> (TBMQueue d -> Sem r ()) -> (TBMQueue d -> Sem r a) -> Sem r a
forall (r :: [(* -> *) -> * -> *]) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket (IO (TBMQueue d) -> Sem r (TBMQueue d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Int -> IO (TBMQueue d)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
maxQueued)) (IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (IO () -> Sem r ())
-> (TBMQueue d -> IO ()) -> TBMQueue d -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (TBMQueue d -> STM ()) -> TBMQueue d -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue d -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)

-- |Interpret 'Queue' with a 'TBMQueue'.
interpretQueueTBM ::
   d r .
  Members [Resource, Race, Embed IO] r =>
  -- |Buffer size
  Int ->
  InterpreterFor (Queue d) r
interpretQueueTBM :: forall d (r :: [(* -> *) -> * -> *]).
Members '[Resource, Race, Embed IO] r =>
Int -> InterpreterFor (Queue d) r
interpretQueueTBM Int
maxQueued Sem (Queue d : r) a
sem = do
  Int -> (TBMQueue d -> Sem r a) -> Sem r a
forall d (r :: [(* -> *) -> * -> *]) a.
Members '[Resource, Embed IO] r =>
Int -> (TBMQueue d -> Sem r a) -> Sem r a
withTBMQueue Int
maxQueued \ TBMQueue d
queue ->
    TBMQueue d -> InterpreterFor (Queue d) r
forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue d
queue Sem (Queue d : r) a
sem
{-# inline interpretQueueTBM #-}