{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}

module Network.AMQP.Worker.Queue where

import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.String (IsString)
import Data.Text (Text)
import Network.AMQP (ExchangeOpts (..), QueueOpts)
import qualified Network.AMQP as AMQP

import Network.AMQP.Worker.Connection (Connection (..), withChannel)
import Network.AMQP.Worker.Key (Bind, Key (..), keyText, toBindKey)

type QueueName = Text

type QueuePrefix = Text

-- | A queue is an inbox for messages to be delivered
data Queue msg
    = Queue (Key Bind msg) QueueName
    deriving (Int -> Queue msg -> ShowS
forall msg. Int -> Queue msg -> ShowS
forall msg. [Queue msg] -> ShowS
forall msg. Queue msg -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Queue msg] -> ShowS
$cshowList :: forall msg. [Queue msg] -> ShowS
show :: Queue msg -> String
$cshow :: forall msg. Queue msg -> String
showsPrec :: Int -> Queue msg -> ShowS
$cshowsPrec :: forall msg. Int -> Queue msg -> ShowS
Show, Queue msg -> Queue msg -> Bool
forall msg. Queue msg -> Queue msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Queue msg -> Queue msg -> Bool
$c/= :: forall msg. Queue msg -> Queue msg -> Bool
== :: Queue msg -> Queue msg -> Bool
$c== :: forall msg. Queue msg -> Queue msg -> Bool
Eq)

-- | Create a queue to receive messages matching the 'Key' with a name prefixed via `queueName`.
--
-- > q <- Worker.queue conn "main" $ key "messages" & any1
-- > Worker.worker conn def q onError onMessage
queue :: (MonadIO m) => Connection -> QueuePrefix -> Key a msg -> m (Queue msg)
queue :: forall (m :: * -> *) a msg.
MonadIO m =>
Connection -> QueueName -> Key a msg -> m (Queue msg)
queue Connection
conn QueueName
pre Key a msg
key = do
    forall (m :: * -> *) a msg.
MonadIO m =>
Connection -> QueueName -> Key a msg -> m (Queue msg)
queueNamed Connection
conn (forall a msg. QueueName -> Key a msg -> QueueName
queueName QueueName
pre Key a msg
key) Key a msg
key

-- | Create a queue to receive messages matching the binding key. Each queue with a unique name
-- will be delivered a separate copy of the messsage. Workers will load balance if operating on the
-- same queue, or on queues with the same name
queueNamed :: (MonadIO m) => Connection -> QueueName -> Key a msg -> m (Queue msg)
queueNamed :: forall (m :: * -> *) a msg.
MonadIO m =>
Connection -> QueueName -> Key a msg -> m (Queue msg)
queueNamed Connection
conn QueueName
name Key a msg
key = do
    let q :: Queue msg
q = forall msg. Key Bind msg -> QueueName -> Queue msg
Queue (forall a msg. Key a msg -> Key Bind msg
toBindKey Key a msg
key) QueueName
name
    forall (m :: * -> *) msg.
MonadIO m =>
Connection -> Queue msg -> m ()
bindQueue Connection
conn Queue msg
q
    forall (m :: * -> *) a. Monad m => a -> m a
return Queue msg
q

-- | Name a queue with a prefix and the binding key name. Useful for seeing at
-- a glance which queues are receiving which messages
--
-- > -- "main messages.new"
-- > queueName "main" (key "messages" & word "new")
queueName :: QueuePrefix -> Key a msg -> QueueName
queueName :: forall a msg. QueueName -> Key a msg -> QueueName
queueName QueueName
pre Key a msg
key = QueueName
pre forall a. Semigroup a => a -> a -> a
<> QueueName
" " forall a. Semigroup a => a -> a -> a
<> forall a msg. Key a msg -> QueueName
keyText Key a msg
key

-- | Queues must be bound before you publish messages to them, or the messages will not be saved.
-- Use `queue` or `queueNamed` instead
bindQueue :: (MonadIO m) => Connection -> Queue msg -> m ()
bindQueue :: forall (m :: * -> *) msg.
MonadIO m =>
Connection -> Queue msg -> m ()
bindQueue Connection
conn (Queue Key Bind msg
key QueueName
name) =
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. Connection -> (Channel -> IO b) -> IO b
withChannel Connection
conn forall a b. (a -> b) -> a -> b
$ \Channel
chan -> do
        let options :: QueueOpts
options = QueueOpts
AMQP.newQueue{queueName :: QueueName
AMQP.queueName = QueueName
name}
        let exg :: ExchangeOpts
exg = ExchangeOpts
AMQP.newExchange{exchangeName :: QueueName
exchangeName = Connection
conn.exchange, exchangeType :: QueueName
exchangeType = QueueName
"topic"}
        ()
_ <- Channel -> ExchangeOpts -> IO ()
AMQP.declareExchange Channel
chan ExchangeOpts
exg
        (QueueName, Int, Int)
_ <- Channel -> QueueOpts -> IO (QueueName, Int, Int)
AMQP.declareQueue Channel
chan QueueOpts
options
        ()
_ <- Channel -> QueueName -> QueueName -> QueueName -> IO ()
AMQP.bindQueue Channel
chan QueueName
name (Connection -> QueueName
exchange Connection
conn) (forall a msg. Key a msg -> QueueName
keyText Key Bind msg
key)
        forall (m :: * -> *) a. Monad m => a -> m a
return ()