module Polysemy.Conc.Interpreter.Queue.TB where
import Control.Concurrent.STM (
TBQueue,
atomically,
isFullTBQueue,
newTBQueueIO,
peekTBQueue,
readTBQueue,
tryPeekTBQueue,
tryReadTBQueue,
writeTBQueue,
)
import qualified Polysemy.Conc.Data.QueueResult as QueueResult
import qualified Polysemy.Conc.Effect.Queue as Queue
import Polysemy.Conc.Effect.Queue (Queue)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Queue.Result (naResult)
import Polysemy.Conc.Queue.Timeout (withTimeout)
interpretQueueTBWith ::
∀ d r .
Members [Race, Embed IO] r =>
TBQueue d ->
InterpreterFor (Queue d) r
interpretQueueTBWith :: forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBQueue d -> InterpreterFor (Queue d) r
interpretQueueTBWith TBQueue d
queue =
(forall (rInitial :: [(* -> *) -> * -> *]) x.
Queue d (Sem rInitial) x -> Sem r x)
-> Sem (Queue d : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
FirstOrder e "interpret" =>
(forall (rInitial :: [(* -> *) -> * -> *]) x.
e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
Queue d (Sem rInitial) x
Queue.Read ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (d -> QueueResult d
forall d. d -> QueueResult d
QueueResult.Success (d -> QueueResult d) -> STM d -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
readTBQueue TBQueue d
queue))
Queue d (Sem rInitial) x
Queue.TryRead ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
naResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM (Maybe d)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue d
queue))
Queue.ReadTimeout t
timeout ->
t -> STM (Maybe d) -> Sem r (QueueResult d)
forall t (r :: [(* -> *) -> * -> *]) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (d -> Maybe d
forall a. a -> Maybe a
Just (d -> Maybe d) -> STM d -> STM (Maybe d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
readTBQueue TBQueue d
queue)
Queue d (Sem rInitial) x
Queue.Peek ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (d -> QueueResult d
forall d. d -> QueueResult d
QueueResult.Success (d -> QueueResult d) -> STM d -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
peekTBQueue TBQueue d
queue))
Queue d (Sem rInitial) x
Queue.TryPeek ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
naResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM (Maybe d)
forall a. TBQueue a -> STM (Maybe a)
tryPeekTBQueue TBQueue d
queue))
Queue.Write d
d ->
IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d))
Queue.TryWrite d
d ->
IO (QueueResult ()) -> Sem r (QueueResult ())
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (IO (QueueResult ()) -> Sem r (QueueResult ()))
-> IO (QueueResult ()) -> Sem r (QueueResult ())
forall a b. (a -> b) -> a -> b
$ STM (QueueResult ()) -> IO (QueueResult ())
forall a. STM a -> IO a
atomically do
STM Bool
-> STM (QueueResult ())
-> STM (QueueResult ())
-> STM (QueueResult ())
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (TBQueue d -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue d
queue) (QueueResult () -> STM (QueueResult ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueueResult ()
forall d. QueueResult d
QueueResult.NotAvailable) (() -> QueueResult ()
forall d. d -> QueueResult d
QueueResult.Success (() -> QueueResult ()) -> STM () -> STM (QueueResult ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d)
Queue.WriteTimeout t
timeout d
d ->
t -> STM (Maybe ()) -> Sem r (QueueResult ())
forall t (r :: [(* -> *) -> * -> *]) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (() -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> STM () -> STM (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d)
Queue d (Sem rInitial) x
Queue.Closed ->
Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Queue d (Sem rInitial) x
Queue.Close ->
Sem r x
forall (f :: * -> *). Applicative f => f ()
unit
{-# inline interpretQueueTBWith #-}
interpretQueueTB ::
∀ d r .
Members [Race, Embed IO] r =>
Natural ->
InterpreterFor (Queue d) r
interpretQueueTB :: forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
Natural -> InterpreterFor (Queue d) r
interpretQueueTB Natural
maxQueued Sem (Queue d : r) a
sem = do
TBQueue d
queue <- IO (TBQueue d) -> Sem r (TBQueue d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. Natural -> IO (TBQueue a)
newTBQueueIO @d Natural
maxQueued)
TBQueue d -> InterpreterFor (Queue d) r
forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBQueue d -> InterpreterFor (Queue d) r
interpretQueueTBWith TBQueue d
queue Sem (Queue d : r) a
sem
{-# inline interpretQueueTB #-}