module Hasql.Notifications
( notifyPool,
FatalError (..),
#if defined(mingw32_HOST_OS)
import Control.Concurrent ( threadDelay )
import Control.Concurrent (threadWaitRead, threadDelay)
import Control.Exception (Exception, throw)
import Control.Monad (forever, unless, void, when)
import Data.ByteString.Char8 (ByteString)
import Data.Functor.Contravariant (contramap)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Database.PostgreSQL.LibPQ as PQ
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import Hasql.Pool (Pool, UsageError, use)
import Hasql.Session (run, sql, statement)
import qualified Hasql.Session as S
import qualified Hasql.Statement as HST
newtype PgIdentifier = PgIdentifier Text deriving (Int -> PgIdentifier -> ShowS
[PgIdentifier] -> ShowS
PgIdentifier -> String
(Int -> PgIdentifier -> ShowS)
-> (PgIdentifier -> String)
-> ([PgIdentifier] -> ShowS)
-> Show PgIdentifier
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PgIdentifier -> ShowS
showsPrec :: Int -> PgIdentifier -> ShowS
$cshow :: PgIdentifier -> String
show :: PgIdentifier -> String
$cshowList :: [PgIdentifier] -> ShowS
showList :: [PgIdentifier] -> ShowS
newtype FatalError = FatalError {FatalError -> String
fatalErrorMessage :: String}
instance Exception FatalError
instance Show FatalError where
show :: FatalError -> String
show = FatalError -> String
fromPgIdentifier :: PgIdentifier -> Text
fromPgIdentifier :: PgIdentifier -> Text
fromPgIdentifier (PgIdentifier Text
bs) = Text
toPgIdentifier :: Text -> PgIdentifier
toPgIdentifier :: Text -> PgIdentifier
toPgIdentifier Text
x =
Text -> PgIdentifier
PgIdentifier (Text -> PgIdentifier) -> Text -> PgIdentifier
forall a b. (a -> b) -> a -> b
$ Text
"\"" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
strictlyReplaceQuotes Text
x Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
strictlyReplaceQuotes :: Text -> Text
strictlyReplaceQuotes :: Text -> Text
strictlyReplaceQuotes = HasCallStack => Text -> Text -> Text -> Text
Text -> Text -> Text -> Text
T.replace Text
"\"" (Text
"\"\"" :: Text)
notifyPool ::
Pool ->
Text ->
Text ->
IO (Either UsageError ())
notifyPool :: Pool -> Text -> Text -> IO (Either UsageError ())
notifyPool Pool
pool Text
channel Text
mesg =
Pool -> Session () -> IO (Either UsageError ())
forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool
pool ((Text, Text) -> Statement (Text, Text) () -> Session ()
forall params result.
params -> Statement params result -> Session result
statement (Text
channel, Text
mesg) Statement (Text, Text) ()
callStatement :: Statement (Text, Text) ()
callStatement = ByteString
-> Params (Text, Text)
-> Result ()
-> Bool
-> Statement (Text, Text) ()
forall a b.
ByteString -> Params a -> Result b -> Bool -> Statement a b
HST.Statement (ByteString
"SELECT pg_notify" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"($1, $2)") Params (Text, Text)
encoder Result ()
HD.noResult Bool
encoder :: Params (Text, Text)
encoder = ((Text, Text) -> Text) -> Params Text -> Params (Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Text, Text) -> Text
forall a b. (a, b) -> a
fst (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
HE.param (NullableOrNot Value Text -> Params Text)
-> NullableOrNot Value Text -> Params Text
forall a b. (a -> b) -> a -> b
$ Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
HE.nonNullable Value Text
HE.text) Params (Text, Text) -> Params (Text, Text) -> Params (Text, Text)
forall a. Semigroup a => a -> a -> a
<> ((Text, Text) -> Text) -> Params Text -> Params (Text, Text)
forall a' a. (a' -> a) -> Params a -> Params a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Text, Text) -> Text
forall a b. (a, b) -> b
snd (NullableOrNot Value Text -> Params Text
forall a. NullableOrNot Value a -> Params a
HE.param (NullableOrNot Value Text -> Params Text)
-> NullableOrNot Value Text -> Params Text
forall a b. (a -> b) -> a -> b
$ Value Text -> NullableOrNot Value Text
forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
HE.nonNullable Value Text
notify ::
Connection ->
PgIdentifier ->
Text ->
IO (Either S.QueryError ())
notify :: Connection -> PgIdentifier -> Text -> IO (Either QueryError ())
notify Connection
con PgIdentifier
channel Text
mesg =
Session () -> Connection -> IO (Either QueryError ())
forall a. Session a -> Connection -> IO (Either QueryError a)
run (ByteString -> Session ()
sql (ByteString -> Session ()) -> ByteString -> Session ()
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text
"NOTIFY " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PgIdentifier -> Text
fromPgIdentifier PgIdentifier
channel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
mesg Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"'")) Connection
listen ::
Connection ->
PgIdentifier ->
IO ()
listen :: Connection -> PgIdentifier -> IO ()
listen Connection
con PgIdentifier
channel =
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> (Connection -> IO ()) -> IO ()
forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
con Connection -> IO ()
execListen :: Connection -> IO ()
execListen = ByteString -> Connection -> IO ()
executeOrPanic (ByteString -> Connection -> IO ())
-> ByteString -> Connection -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
"LISTEN " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PgIdentifier -> Text
fromPgIdentifier PgIdentifier
unlisten ::
Connection ->
PgIdentifier ->
IO ()
unlisten :: Connection -> PgIdentifier -> IO ()
unlisten Connection
con PgIdentifier
channel =
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> (Connection -> IO ()) -> IO ()
forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
con Connection -> IO ()
execUnlisten :: Connection -> IO ()
execUnlisten = ByteString -> Connection -> IO ()
executeOrPanic (ByteString -> Connection -> IO ())
-> ByteString -> Connection -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
"UNLISTEN " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PgIdentifier -> Text
fromPgIdentifier PgIdentifier
executeOrPanic :: ByteString -> PQ.Connection -> IO ()
executeOrPanic :: ByteString -> Connection -> IO ()
executeOrPanic ByteString
cmd Connection
pqCon = do
Maybe Result
mResult <- Connection -> ByteString -> IO (Maybe Result)
PQ.exec Connection
pqCon ByteString
case Maybe Result
mResult of
Maybe Result
Nothing -> do
Maybe ByteString
mError <- Connection -> IO (Maybe ByteString)
PQ.errorMessage Connection
String -> IO ()
forall a. String -> a
panic (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> (ByteString -> String) -> Maybe ByteString -> String
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String
"Error executing" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
cmd) (Text -> String
T.unpack (Text -> String) -> (ByteString -> Text) -> ByteString -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8Lenient) Maybe ByteString
Just Result
result -> do
status <- Result -> IO ExecStatus
PQ.resultStatus Result
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ExecStatus
status ExecStatus -> ExecStatus -> Bool
forall a. Eq a => a -> a -> Bool
== ExecStatus
PQ.FatalError) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe ByteString
mError <- Result -> IO (Maybe ByteString)
PQ.resultErrorMessage Result
String -> IO ()
forall a. String -> a
panic (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> (ByteString -> String) -> Maybe ByteString -> String
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String
"Error executing" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
cmd) (Text -> String
T.unpack (Text -> String) -> (ByteString -> Text) -> ByteString -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8Lenient) Maybe ByteString
waitForNotifications ::
(ByteString -> ByteString -> IO ()) ->
Connection ->
IO ()
waitForNotifications :: (ByteString -> ByteString -> IO ()) -> Connection -> IO ()
waitForNotifications ByteString -> ByteString -> IO ()
sendNotification Connection
con =
Connection -> (Connection -> IO ()) -> IO ()
forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
con ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Any -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Any -> IO ()) -> (Connection -> IO Any) -> Connection -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> (Connection -> IO ()) -> Connection -> IO Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
pqFetch :: Connection -> IO ()
pqFetch Connection
pqCon = do
Maybe Notify
mNotification <- Connection -> IO (Maybe Notify)
PQ.notifies Connection
case Maybe Notify
mNotification of
Maybe Notify
Nothing -> do
Maybe Fd
mfd <- Connection -> IO (Maybe Fd)
PQ.socket Connection
case Maybe Fd
mfd of
Maybe Fd
Nothing -> IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
#if defined(mingw32_HOST_OS)
Just _ -> do
void $ threadDelay 1000000
Just Fd
fd -> do
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Fd -> IO ()
threadWaitRead Fd
result <- Connection -> IO Bool
PQ.consumeInput Connection
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
result (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe ByteString
mError <- Connection -> IO (Maybe ByteString)
PQ.errorMessage Connection
String -> IO ()
forall a. String -> a
panic (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> (ByteString -> String) -> Maybe ByteString -> String
forall b a. b -> (a -> b) -> Maybe a -> b
maybe String
"Error checking for PostgreSQL notifications" (Text -> String
T.unpack (Text -> String) -> (ByteString -> Text) -> ByteString -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8Lenient) Maybe ByteString
Just Notify
notification ->
ByteString -> ByteString -> IO ()
sendNotification (Notify -> ByteString
PQ.notifyRelname Notify
notification) (Notify -> ByteString
PQ.notifyExtra Notify
panic :: String -> a
panic :: forall a. String -> a
panic String
a = FatalError -> a
forall a e. Exception e => e -> a
throw (String -> FatalError
FatalError String