module PgRecorder.Database
( callProcedure
, listen
, unlisten
, waitForNotifications
, PgIdentifier
, toPgIdentifier
) where
import Protolude
import Hasql.Pool (Pool, UsageError, use)
import Hasql.Session (query)
import Hasql.Query (statement)
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import qualified Database.PostgreSQL.LibPQ as PQ
import Data.Either.Combinators
import qualified Data.ByteString.Char8 as B
import Data.ByteString.Search (replace)
import Data.Functor.Contravariant (contramap)
newtype Error = NotifyError Text deriving Show
newtype PgIdentifier = PgIdentifier ByteString deriving (Show)
fromPgIdentifier :: PgIdentifier -> ByteString
fromPgIdentifier (PgIdentifier bs) = bs
toPgIdentifier :: ByteString -> PgIdentifier
toPgIdentifier x = PgIdentifier $ "\"" <> strictlyReplaceQuotes (trimNullChars x) <> "\""
where
trimNullChars :: ByteString -> ByteString
trimNullChars = B.takeWhile (/= '\x0')
strictlyReplaceQuotes :: ByteString -> ByteString
strictlyReplaceQuotes = toS . replace "\"" ("\"\"" :: ByteString)
callProcedure :: Pool -> PgIdentifier -> ByteString -> ByteString -> IO (Either Error ())
callProcedure pool procedure channel mesg =
mapError <$> use pool (query (toS channel, toS mesg) callStatement)
where
mapError :: Either UsageError () -> Either Error ()
mapError = mapLeft (NotifyError . show)
callStatement = statement ("SELECT " <> fromPgIdentifier procedure <> "($1, $2)") encoder HD.unit False
encoder = contramap fst (HE.value HE.text) <> contramap snd (HE.value HE.text)
listen :: Connection -> PgIdentifier -> IO ()
listen con channel =
void $ withLibPQConnection con execListen
where
execListen pqCon = void $ PQ.exec pqCon $ "LISTEN " <> fromPgIdentifier channel
unlisten :: Connection -> PgIdentifier -> IO ()
unlisten con channel =
void $ withLibPQConnection con execListen
where
execListen pqCon = void $ PQ.exec pqCon $ "UNLISTEN " <> fromPgIdentifier channel
waitForNotifications :: (ByteString -> ByteString -> IO()) -> Connection -> IO ()
waitForNotifications sendNotification con =
withLibPQConnection con $ void . forever . pqFetch
where
pqFetch pqCon = do
mNotification <- PQ.notifies pqCon
case mNotification of
Nothing -> do
mfd <- PQ.socket pqCon
case mfd of
Nothing -> panic "Error checking for PostgreSQL notifications"
Just fd -> do
void $ threadWaitRead fd
void $ PQ.consumeInput pqCon
Just notification ->
sendNotification (PQ.notifyRelname notification) (PQ.notifyExtra notification)