module Pulsar.Producer where
import qualified Control.Monad.Catch as E
import Control.Monad.Managed
import Data.IORef
import Data.Text ( Text )
import qualified Pulsar.Core as C
import Pulsar.Connection
import Pulsar.Types
import UnliftIO.Chan
newtype Producer m = Producer
{ Producer m -> PulsarMessage -> m ()
produce :: PulsarMessage -> m ()
}
data ProducerState = ProducerState
{ ProducerState -> SeqId
stSeqId :: SeqId
, ProducerState -> Text
stName :: Text
}
mkSeqId :: MonadIO m => IORef ProducerState -> m SeqId
mkSeqId :: IORef ProducerState -> m SeqId
mkSeqId ref :: IORef ProducerState
ref = IO SeqId -> m SeqId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SeqId -> m SeqId) -> IO SeqId -> m SeqId
forall a b. (a -> b) -> a -> b
$ IORef ProducerState
-> (ProducerState -> (ProducerState, SeqId)) -> IO SeqId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
IORef ProducerState
ref
(\(ProducerState s :: SeqId
s n :: Text
n) -> let s' :: SeqId
s' = SeqId
s SeqId -> SeqId -> SeqId
forall a. Num a => a -> a -> a
+ 1 in (SeqId -> Text -> ProducerState
ProducerState SeqId
s' Text
n, SeqId
s))
newProducer
:: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> m (Producer f)
newProducer :: PulsarCtx -> Topic -> m (Producer f)
newProducer (Ctx conn :: Connection
conn app :: IORef AppState
app) topic :: Topic
topic = do
Chan Response
chan <- m (Chan Response)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
ProducerId
pid <- Chan Response -> IORef AppState -> m ProducerId
forall (m :: * -> *).
MonadIO m =>
Chan Response -> IORef AppState -> m ProducerId
mkProducerId Chan Response
chan IORef AppState
app
Text
pname <- IO Text -> m Text
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ Chan Response -> ProducerId -> IO Text
mkProducer Chan Response
chan ProducerId
pid
IORef ProducerState
pst <- IO (IORef ProducerState) -> m (IORef ProducerState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ProducerState) -> m (IORef ProducerState))
-> IO (IORef ProducerState) -> m (IORef ProducerState)
forall a b. (a -> b) -> a -> b
$ ProducerState -> IO (IORef ProducerState)
forall a. a -> IO (IORef a)
newIORef (SeqId -> Text -> ProducerState
ProducerState 0 Text
pname)
Managed (Producer f) -> m (Producer f)
forall (m :: * -> *) a. MonadManaged m => Managed a -> m a
using (Managed (Producer f) -> m (Producer f))
-> Managed (Producer f) -> m (Producer f)
forall a b. (a -> b) -> a -> b
$ (forall r. (Producer f -> IO r) -> IO r) -> Managed (Producer f)
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed
(IO (Producer f)
-> (Producer f -> IO ()) -> (Producer f -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
E.bracket (Producer f -> IO (Producer f)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Producer f -> IO (Producer f)) -> Producer f -> IO (Producer f)
forall a b. (a -> b) -> a -> b
$ (PulsarMessage -> f ()) -> Producer f
forall (m :: * -> *). (PulsarMessage -> m ()) -> Producer m
Producer (Chan Response
-> ProducerId -> IORef ProducerState -> PulsarMessage -> f ()
forall (m :: * -> *).
MonadIO m =>
Chan Response
-> ProducerId -> IORef ProducerState -> PulsarMessage -> m ()
dispatch Chan Response
chan ProducerId
pid IORef ProducerState
pst))
(IO () -> Producer f -> IO ()
forall a b. a -> b -> a
const (IO () -> Producer f -> IO ()) -> IO () -> Producer f -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ReqId
newReq IO ReqId -> (ReqId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \r :: ReqId
r -> Connection -> Chan Response -> ReqId -> ProducerId -> IO ()
C.closeProducer Connection
conn Chan Response
chan ReqId
r ProducerId
pid)
)
where
newReq :: IO ReqId
newReq = IORef AppState -> IO ReqId
forall (m :: * -> *). MonadIO m => IORef AppState -> m ReqId
mkRequestId IORef AppState
app
dispatch :: Chan Response
-> ProducerId -> IORef ProducerState -> PulsarMessage -> m ()
dispatch chan :: Chan Response
chan pid :: ProducerId
pid pst :: IORef ProducerState
pst msg :: PulsarMessage
msg = do
SeqId
sid <- IORef ProducerState -> m SeqId
forall (m :: * -> *). MonadIO m => IORef ProducerState -> m SeqId
mkSeqId IORef ProducerState
pst
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Chan Response -> ProducerId -> SeqId -> PulsarMessage -> IO ()
C.send Connection
conn Chan Response
chan ProducerId
pid SeqId
sid PulsarMessage
msg
mkProducer :: Chan Response -> ProducerId -> IO Text
mkProducer chan :: Chan Response
chan pid :: ProducerId
pid = do
ReqId
req1 <- IO ReqId
newReq
Connection -> Chan Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn Chan Response
chan ReqId
req1 Topic
topic
ReqId
req2 <- IO ReqId
newReq
Connection
-> Chan Response -> ReqId -> ProducerId -> Topic -> IO Text
C.newProducer Connection
conn Chan Response
chan ReqId
req2 ProducerId
pid Topic
topic