{-# LANGUAGE FlexibleContexts #-}
module Network.AMQP.Worker.Worker where
import Control.Concurrent (threadDelay)
import Control.Exception (SomeException (..))
import Control.Monad (forever)
import Control.Monad.Catch (Exception (..), MonadCatch, catch)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (FromJSON)
import Data.ByteString.Lazy (ByteString)
import Data.Default (Default (..))
import Network.AMQP.Worker.Connection (Connection)
import Network.AMQP.Worker.Message (ConsumeResult (..), Message (..), Microseconds, ParseError (..), consumeNext)
import Network.AMQP.Worker.Queue (Queue (..))
worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
worker :: forall a (m :: * -> *).
(FromJSON a, MonadIO m, MonadCatch m) =>
Connection
-> WorkerOptions
-> Queue a
-> (WorkerException SomeException -> m ())
-> (Message a -> m ())
-> m ()
worker Connection
conn WorkerOptions
opts Queue a
queue WorkerException SomeException -> m ()
onError Message a -> m ()
action =
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
ConsumeResult a
eres <- forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext (WorkerOptions -> Microseconds
pollDelay WorkerOptions
opts) Connection
conn Queue a
queue
case ConsumeResult a
eres of
Error (ParseError String
reason ByteString
bd) ->
WorkerException SomeException -> m ()
onError (forall e. ByteString -> String -> WorkerException e
MessageParseError ByteString
bd String
reason)
Parsed Message a
msg ->
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
catch
(Message a -> m ()
action Message a
msg)
(WorkerException SomeException -> m ()
onError forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. ByteString -> e -> WorkerException e
OtherException (forall a. Message a -> ByteString
body Message a
msg))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Microseconds -> IO ()
threadDelay (WorkerOptions -> Microseconds
loopDelay WorkerOptions
opts)
data WorkerOptions = WorkerOptions
{ WorkerOptions -> Microseconds
pollDelay :: Microseconds
, WorkerOptions -> Microseconds
loopDelay :: Microseconds
}
deriving (Microseconds -> WorkerOptions -> ShowS
[WorkerOptions] -> ShowS
WorkerOptions -> String
forall a.
(Microseconds -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerOptions] -> ShowS
$cshowList :: [WorkerOptions] -> ShowS
show :: WorkerOptions -> String
$cshow :: WorkerOptions -> String
showsPrec :: Microseconds -> WorkerOptions -> ShowS
$cshowsPrec :: Microseconds -> WorkerOptions -> ShowS
Show, WorkerOptions -> WorkerOptions -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerOptions -> WorkerOptions -> Bool
$c/= :: WorkerOptions -> WorkerOptions -> Bool
== :: WorkerOptions -> WorkerOptions -> Bool
$c== :: WorkerOptions -> WorkerOptions -> Bool
Eq)
instance Default WorkerOptions where
def :: WorkerOptions
def =
WorkerOptions
{ pollDelay :: Microseconds
pollDelay = Microseconds
10 forall a. Num a => a -> a -> a
* Microseconds
1000
, loopDelay :: Microseconds
loopDelay = Microseconds
0
}
data WorkerException e
= MessageParseError ByteString String
| OtherException ByteString e
deriving (Microseconds -> WorkerException e -> ShowS
forall e. Show e => Microseconds -> WorkerException e -> ShowS
forall e. Show e => [WorkerException e] -> ShowS
forall e. Show e => WorkerException e -> String
forall a.
(Microseconds -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerException e] -> ShowS
$cshowList :: forall e. Show e => [WorkerException e] -> ShowS
show :: WorkerException e -> String
$cshow :: forall e. Show e => WorkerException e -> String
showsPrec :: Microseconds -> WorkerException e -> ShowS
$cshowsPrec :: forall e. Show e => Microseconds -> WorkerException e -> ShowS
Show, WorkerException e -> WorkerException e -> Bool
forall e. Eq e => WorkerException e -> WorkerException e -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerException e -> WorkerException e -> Bool
$c/= :: forall e. Eq e => WorkerException e -> WorkerException e -> Bool
== :: WorkerException e -> WorkerException e -> Bool
$c== :: forall e. Eq e => WorkerException e -> WorkerException e -> Bool
Eq)
instance (Exception e) => Exception (WorkerException e)