module Hasql.Queue.Low.ExactlyOnce
( enqueue
, withDequeue
, withDequeueWith
) where
import qualified Hasql.Queue.High.ExactlyOnce as H
import Control.Exception
import qualified Hasql.Encoders as E
import qualified Hasql.Decoders as D
import Hasql.Session
import Hasql.Statement
import Hasql.Connection
import qualified Hasql.Queue.Internal as I
import Control.Monad.IO.Class
import Data.Text(Text)
enqueue :: Text
-> E.Value a
-> [a]
-> Session ()
enqueue channel theEncoder values = do
H.enqueue theEncoder values
statement channel $ Statement "SELECT notify_on($1)" (E.param $ E.nonNullable E.text) D.noResult True
dequeueOrRollbackAndThrow :: D.Value a -> Int -> Session [a]
dequeueOrRollbackAndThrow theDecoder dequeueCount = H.dequeue theDecoder dequeueCount >>= \case
[] -> liftIO $ throwIO I.NoRows
xs -> pure xs
withDequeue :: Text
-> Connection
-> D.Value a
-> Int
-> (Session [a] -> Session b)
-> IO b
withDequeue = withDequeueWith mempty
withDequeueWith :: I.WithNotifyHandlers
-> Text
-> Connection
-> D.Value a
-> Int
-> (Session [a] -> Session b)
-> IO b
withDequeueWith withNotifyHandlers channel conn theDecoder dequeueCount runner
= I.withNotifyWith withNotifyHandlers channel conn
$ runner (dequeueOrRollbackAndThrow theDecoder dequeueCount)