{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}
module Network.AMQP.Worker.Message where
import Control.Exception (Exception, throwIO)
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as Aeson
import Data.ByteString.Lazy (ByteString)
import Network.AMQP (Ack (..), DeliveryMode (..), newMsg)
import qualified Network.AMQP as AMQP
import Network.AMQP.Worker.Connection (Connection (..), withChannel)
import Network.AMQP.Worker.Key (Key, RequireRoute, Route, keyText)
import Network.AMQP.Worker.Poll (poll)
import Network.AMQP.Worker.Queue (Queue (..))
data Message a = Message
{ forall a. Message a -> ByteString
body :: ByteString
, forall a. Message a -> a
value :: a
}
deriving (Int -> Message a -> ShowS
forall a. Show a => Int -> Message a -> ShowS
forall a. Show a => [Message a] -> ShowS
forall a. Show a => Message a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Message a] -> ShowS
$cshowList :: forall a. Show a => [Message a] -> ShowS
show :: Message a -> String
$cshow :: forall a. Show a => Message a -> String
showsPrec :: Int -> Message a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Message a -> ShowS
Show, Message a -> Message a -> Bool
forall a. Eq a => Message a -> Message a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Message a -> Message a -> Bool
$c/= :: forall a. Eq a => Message a -> Message a -> Bool
== :: Message a -> Message a -> Bool
$c== :: forall a. Eq a => Message a -> Message a -> Bool
Eq)
publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
publish :: forall a msg (m :: * -> *).
(RequireRoute a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publish = forall a msg (m :: * -> *).
(RequireRoute a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publishToExchange
publishToExchange :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
publishToExchange :: forall a msg (m :: * -> *).
(RequireRoute a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publishToExchange Connection
conn Key a msg
rk msg
msg =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. Connection -> (Channel -> IO b) -> IO b
withChannel Connection
conn forall a b. (a -> b) -> a -> b
$ \Channel
chan -> do
Maybe Int
_ <- Channel -> Text -> Text -> Message -> IO (Maybe Int)
AMQP.publishMsg Channel
chan Connection
conn.exchange (forall a msg. Key a msg -> Text
keyText Key a msg
rk) (forall a. ToJSON a => a -> Message
jsonMessage msg
msg)
forall (m :: * -> *) a. Monad m => a -> m a
return ()
where
jsonMessage :: ToJSON a => a -> AMQP.Message
jsonMessage :: forall a. ToJSON a => a -> Message
jsonMessage a
a =
Message
newMsg
{ msgBody :: ByteString
AMQP.msgBody = forall a. ToJSON a => a -> ByteString
Aeson.encode a
a
, msgContentType :: Maybe Text
AMQP.msgContentType = forall a. a -> Maybe a
Just Text
"application/json"
, msgContentEncoding :: Maybe Text
AMQP.msgContentEncoding = forall a. a -> Maybe a
Just Text
"UTF-8"
, msgDeliveryMode :: Maybe DeliveryMode
AMQP.msgDeliveryMode = forall a. a -> Maybe a
Just DeliveryMode
Persistent
}
takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a)
takeMessage :: forall (m :: * -> *) a.
(MonadIO m, FromJSON a) =>
Connection -> Queue a -> m (Message a)
takeMessage Connection
conn Queue a
q = do
let delay :: Int
delay = Int
10000 :: Microseconds
ConsumeResult a
res <- forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Int -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext Int
delay Connection
conn Queue a
q
case ConsumeResult a
res of
Error ParseError
e -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
throwIO ParseError
e
Parsed Message a
msg -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Message a
msg
worker :: (FromJSON a, MonadIO m) => Connection -> Queue a -> (Message a -> m ()) -> m ()
worker :: forall a (m :: * -> *).
(FromJSON a, MonadIO m) =>
Connection -> Queue a -> (Message a -> m ()) -> m ()
worker Connection
conn Queue a
queue Message a -> m ()
action =
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
Message a
m <- forall (m :: * -> *) a.
(MonadIO m, FromJSON a) =>
Connection -> Queue a -> m (Message a)
takeMessage Connection
conn Queue a
queue
Message a -> m ()
action Message a
m
consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext :: forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Int -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext Int
pd Connection
conn Queue msg
key =
forall (m :: * -> *) a. MonadIO m => Int -> m (Maybe a) -> m a
poll Int
pd forall a b. (a -> b) -> a -> b
$ forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume Connection
conn Queue msg
key
consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume :: forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume Connection
conn (Queue Key Bind msg
_ Text
name) = do
Maybe (Message, Envelope)
mme <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. Connection -> (Channel -> IO b) -> IO b
withChannel Connection
conn forall a b. (a -> b) -> a -> b
$ \Channel
chan -> do
Maybe (Message, Envelope)
m <- Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
AMQP.getMsg Channel
chan Ack
Ack Text
name
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Message, Envelope)
m
case Maybe (Message, Envelope)
mme of
Maybe (Message, Envelope)
Nothing ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just (Message
msg, Envelope
env) -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Envelope -> IO ()
AMQP.ackEnv Envelope
env
let bd :: ByteString
bd = Message -> ByteString
AMQP.msgBody Message
msg
case forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode ByteString
bd of
Left String
err ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. ParseError -> ConsumeResult a
Error (String -> ByteString -> ParseError
ParseError String
err ByteString
bd)
Right msg
v ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. Message a -> ConsumeResult a
Parsed (forall a. ByteString -> a -> Message a
Message ByteString
bd msg
v)
data ConsumeResult a
= Parsed (Message a)
| Error ParseError
data ParseError = ParseError String ByteString
deriving (Int -> ParseError -> ShowS
[ParseError] -> ShowS
ParseError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ParseError] -> ShowS
$cshowList :: [ParseError] -> ShowS
show :: ParseError -> String
$cshow :: ParseError -> String
showsPrec :: Int -> ParseError -> ShowS
$cshowsPrec :: Int -> ParseError -> ShowS
Show, Show ParseError
Typeable ParseError
SomeException -> Maybe ParseError
ParseError -> String
ParseError -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ParseError -> String
$cdisplayException :: ParseError -> String
fromException :: SomeException -> Maybe ParseError
$cfromException :: SomeException -> Maybe ParseError
toException :: ParseError -> SomeException
$ctoException :: ParseError -> SomeException
Exception)
type Microseconds = Int