{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeFamilies #-} module Periodic.Server.Persist.PSQL ( PSQL ) where import Control.Monad (void) import Data.Binary (decodeOrFail) import Data.Byteable (toBytes) import Data.ByteString (ByteString) import Data.ByteString.Base64 (decode, encode) import Data.ByteString.Lazy (fromStrict) import qualified Data.Foldable as F (foldrM) import Data.Int (Int64) import Data.List (intercalate) import Data.Maybe (fromMaybe, isJust, listToMaybe) import Data.Pool (Pool, createPool, withResource) import Data.String (IsString (..)) import Database.PostgreSQL.Simple import Periodic.Server.Persist (Persist (PersistConfig, PersistException), State (..)) import qualified Periodic.Server.Persist as Persist import Periodic.Types.Job (FuncName (..), Job, JobName (..), getSchedAt) import Prelude hiding (foldr, lookup) import System.Log.Logger (errorM, infoM) import UnliftIO (Exception, SomeException, Typeable) stateName :: State -> ByteString stateName :: State -> ByteString stateName Pending = "0" stateName Running = "1" stateName Locking = "2" newtype PSQL = PSQL (Pool Connection) numStripes :: Int numStripes = 1 idleTime :: NominalDiffTime idleTime = 10 maxResources :: Int maxResources = 10 instance Persist PSQL where data PersistConfig PSQL = PSQLPath ByteString data PersistException PSQL = PSQLException SomeException deriving (Int -> PersistException PSQL -> ShowS [PersistException PSQL] -> ShowS PersistException PSQL -> String (Int -> PersistException PSQL -> ShowS) -> (PersistException PSQL -> String) -> ([PersistException PSQL] -> ShowS) -> Show (PersistException PSQL) forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [PersistException PSQL] -> ShowS $cshowList :: [PersistException PSQL] -> ShowS show :: PersistException PSQL -> String $cshow :: PersistException PSQL -> String showsPrec :: Int -> PersistException PSQL -> ShowS $cshowsPrec :: Int -> PersistException PSQL -> ShowS Show, Typeable) newPersist :: PersistConfig PSQL -> IO PSQL newPersist (PSQLPath path) = do String -> String -> IO () infoM "Periodic.Server.Persist.PSQL" ("PSQL connected " String -> ShowS forall a. [a] -> [a] -> [a] ++ ByteString -> String forall a. Show a => a -> String show ByteString path) Pool Connection pool <- IO Connection -> (Connection -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool Connection) forall a. IO a -> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a) createPool (ByteString -> IO Connection connectPostgreSQL ByteString path) Connection -> IO () close Int numStripes NominalDiffTime idleTime Int maxResources Pool Connection -> (Connection -> IO ()) -> IO () forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Connection -> IO () -> IO () forall a. Connection -> IO a -> IO a withTransaction Connection conn (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do Connection -> IO () createConfigTable Connection conn Connection -> IO () createJobTable Connection conn Connection -> IO () createFuncTable Connection conn Pool Connection -> IO () allPending Pool Connection pool PSQL -> IO PSQL forall (m :: * -> *) a. Monad m => a -> m a return (PSQL -> IO PSQL) -> PSQL -> IO PSQL forall a b. (a -> b) -> a -> b $ Pool Connection -> PSQL PSQL Pool Connection pool member :: PSQL -> State -> FuncName -> JobName -> IO Bool member (PSQL pool :: Pool Connection pool) = Pool Connection -> State -> FuncName -> JobName -> IO Bool doMember Pool Connection pool lookup :: PSQL -> State -> FuncName -> JobName -> IO (Maybe Job) lookup (PSQL pool :: Pool Connection pool) = Pool Connection -> State -> FuncName -> JobName -> IO (Maybe Job) doLookup Pool Connection pool insert :: PSQL -> State -> FuncName -> JobName -> Job -> IO () insert (PSQL pool :: Pool Connection pool) = Pool Connection -> State -> FuncName -> JobName -> Job -> IO () doInsert Pool Connection pool delete :: PSQL -> FuncName -> JobName -> IO () delete (PSQL pool :: Pool Connection pool) = Pool Connection -> FuncName -> JobName -> IO () doDelete Pool Connection pool size :: PSQL -> State -> FuncName -> IO Int64 size (PSQL pool :: Pool Connection pool) = Pool Connection -> State -> FuncName -> IO Int64 doSize Pool Connection pool foldr :: PSQL -> State -> (Job -> a -> a) -> a -> IO a foldr (PSQL pool :: Pool Connection pool) = Pool Connection -> State -> (Job -> a -> a) -> a -> IO a forall a. Pool Connection -> State -> (Job -> a -> a) -> a -> IO a doFoldr Pool Connection pool foldrPending :: PSQL -> Int64 -> [FuncName] -> (Job -> a -> a) -> a -> IO a foldrPending (PSQL pool :: Pool Connection pool) = Pool Connection -> Int64 -> [FuncName] -> (Job -> a -> a) -> a -> IO a forall a. Pool Connection -> Int64 -> [FuncName] -> (Job -> a -> a) -> a -> IO a doFoldrPending Pool Connection pool foldrLocking :: PSQL -> Int -> FuncName -> (Job -> a -> a) -> a -> IO a foldrLocking (PSQL pool :: Pool Connection pool) = Pool Connection -> Int -> FuncName -> (Job -> a -> a) -> a -> IO a forall a. Pool Connection -> Int -> FuncName -> (Job -> a -> a) -> a -> IO a doFoldrLocking Pool Connection pool dumpJob :: PSQL -> IO [Job] dumpJob (PSQL pool :: Pool Connection pool) = Pool Connection -> IO [Job] doDumpJob Pool Connection pool configSet :: PSQL -> String -> Int -> IO () configSet (PSQL pool :: Pool Connection pool) = Pool Connection -> String -> Int -> IO () doConfigSet Pool Connection pool configGet :: PSQL -> String -> IO (Maybe Int) configGet (PSQL pool :: Pool Connection pool) = Pool Connection -> String -> IO (Maybe Int) doConfigGet Pool Connection pool insertFuncName :: PSQL -> FuncName -> IO () insertFuncName (PSQL pool :: Pool Connection pool) = Pool Connection -> FuncName -> IO () doInsertFuncName Pool Connection pool removeFuncName :: PSQL -> FuncName -> IO () removeFuncName (PSQL pool :: Pool Connection pool) = Pool Connection -> FuncName -> IO () doRemoveFuncName Pool Connection pool funcList :: PSQL -> IO [FuncName] funcList (PSQL pool :: Pool Connection pool) = Pool Connection -> IO [FuncName] doFuncList Pool Connection pool minSchedAt :: PSQL -> FuncName -> IO Int64 minSchedAt (PSQL pool :: Pool Connection pool) = Pool Connection -> State -> FuncName -> IO Int64 doMinSchedAt Pool Connection pool State Pending instance Exception (PersistException PSQL) instance IsString (PersistConfig PSQL) where fromString :: String -> PersistConfig PSQL fromString = ByteString -> PersistConfig PSQL PSQLPath (ByteString -> PersistConfig PSQL) -> (String -> ByteString) -> String -> PersistConfig PSQL forall b c a. (b -> c) -> (a -> b) -> a -> c . String -> ByteString forall a. IsString a => String -> a fromString newtype TableName = TableName String deriving (Int -> TableName -> ShowS [TableName] -> ShowS TableName -> String (Int -> TableName -> ShowS) -> (TableName -> String) -> ([TableName] -> ShowS) -> Show TableName forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [TableName] -> ShowS $cshowList :: [TableName] -> ShowS show :: TableName -> String $cshow :: TableName -> String showsPrec :: Int -> TableName -> ShowS $cshowsPrec :: Int -> TableName -> ShowS Show) instance IsString TableName where fromString :: String -> TableName fromString = String -> TableName TableName getTableName :: TableName -> String getTableName :: TableName -> String getTableName (TableName name :: String name) = String name newtype Column = Column { Column -> String unColumn :: String } deriving (Int -> Column -> ShowS [Column] -> ShowS Column -> String (Int -> Column -> ShowS) -> (Column -> String) -> ([Column] -> ShowS) -> Show Column forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [Column] -> ShowS $cshowList :: [Column] -> ShowS show :: Column -> String $cshow :: Column -> String showsPrec :: Int -> Column -> ShowS $cshowsPrec :: Int -> Column -> ShowS Show) instance IsString Column where fromString :: String -> Column fromString = String -> Column Column type Columns = [Column] columnsToString :: Columns -> String columnsToString :: [Column] -> String columnsToString = String -> [String] -> String forall a. [a] -> [[a]] -> [a] intercalate ", " ([String] -> String) -> ([Column] -> [String]) -> [Column] -> String forall b c a. (b -> c) -> (a -> b) -> a -> c . (Column -> String) -> [Column] -> [String] forall a b. (a -> b) -> [a] -> [b] map Column -> String unColumn createTable :: TableName -> Columns -> Connection -> IO Int64 createTable :: TableName -> [Column] -> Connection -> IO Int64 createTable tn :: TableName tn cols :: [Column] cols conn :: Connection conn = Connection -> Query -> IO Int64 execute_ Connection conn Query sql where sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ [String] -> String forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat [ "CREATE TABLE IF NOT EXISTS ", TableName -> String getTableName TableName tn, " (" , [Column] -> String columnsToString [Column] cols , ")" ] newtype IndexName = IndexName String deriving (Int -> IndexName -> ShowS [IndexName] -> ShowS IndexName -> String (Int -> IndexName -> ShowS) -> (IndexName -> String) -> ([IndexName] -> ShowS) -> Show IndexName forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [IndexName] -> ShowS $cshowList :: [IndexName] -> ShowS show :: IndexName -> String $cshow :: IndexName -> String showsPrec :: Int -> IndexName -> ShowS $cshowsPrec :: Int -> IndexName -> ShowS Show) instance IsString IndexName where fromString :: String -> IndexName fromString = String -> IndexName IndexName getOnlyDefault :: FromRow (Only a) => a -> [Only a] -> a getOnlyDefault :: a -> [Only a] -> a getOnlyDefault a :: a a = a -> (Only a -> a) -> Maybe (Only a) -> a forall b a. b -> (a -> b) -> Maybe a -> b maybe a a Only a -> a forall a. Only a -> a fromOnly (Maybe (Only a) -> a) -> ([Only a] -> Maybe (Only a)) -> [Only a] -> a forall b c a. (b -> c) -> (a -> b) -> a -> c . [Only a] -> Maybe (Only a) forall a. [a] -> Maybe a listToMaybe insertOrUpdate :: ToRow a => TableName -> Columns -> Columns -> a -> Pool Connection -> IO Int64 insertOrUpdate :: TableName -> [Column] -> [Column] -> a -> Pool Connection -> IO Int64 insertOrUpdate tn :: TableName tn ucols :: [Column] ucols vcols :: [Column] vcols a :: a a pool :: Pool Connection pool = Pool Connection -> (Connection -> IO Int64) -> IO Int64 forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO Int64) -> IO Int64) -> (Connection -> IO Int64) -> IO Int64 forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Connection -> Query -> a -> IO Int64 forall q. ToRow q => Connection -> Query -> q -> IO Int64 execute Connection conn Query sql a a where cols :: [Column] cols = [Column] ucols [Column] -> [Column] -> [Column] forall a. [a] -> [a] -> [a] ++ [Column] vcols v :: [Column] v = Int -> Column -> [Column] forall a. Int -> a -> [a] replicate ([Column] -> Int forall (t :: * -> *) a. Foldable t => t a -> Int length [Column] cols) "?" setSql :: String setSql = String -> [String] -> String forall a. [a] -> [[a]] -> [a] intercalate ", " ([String] -> String) -> [String] -> String forall a b. (a -> b) -> a -> b $ (Column -> String) -> [Column] -> [String] forall a b. (a -> b) -> [a] -> [b] map Column -> String appendSet [Column] vcols appendSet :: Column -> String appendSet :: Column -> String appendSet (Column col :: String col) | '=' Char -> String -> Bool forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool `elem` String col = String col | Bool otherwise = String col String -> ShowS forall a. [a] -> [a] -> [a] ++ " = excluded." String -> ShowS forall a. [a] -> [a] -> [a] ++ String col doSql :: String doSql = if [Column] -> Bool forall (t :: * -> *) a. Foldable t => t a -> Bool null [Column] vcols then " DO NOTHING" else " DO UPDATE SET " String -> ShowS forall a. [a] -> [a] -> [a] ++ String setSql sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ [String] -> String forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat [ "INSERT INTO ", TableName -> String getTableName TableName tn , " (", [Column] -> String columnsToString [Column] cols, ")" , " VALUES" , " (", [Column] -> String columnsToString [Column] v, ")" , " ON CONFLICT (", [Column] -> String columnsToString [Column] ucols, ")" , String doSql ] update :: ToRow a => TableName -> Columns -> String -> a -> Pool Connection -> IO Int64 update :: TableName -> [Column] -> String -> a -> Pool Connection -> IO Int64 update tn :: TableName tn cols :: [Column] cols partSql :: String partSql a :: a a pool :: Pool Connection pool = Pool Connection -> (Connection -> IO Int64) -> IO Int64 forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO Int64) -> IO Int64) -> (Connection -> IO Int64) -> IO Int64 forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Connection -> Query -> a -> IO Int64 forall q. ToRow q => Connection -> Query -> q -> IO Int64 execute Connection conn Query sql a a where setSql :: String setSql = String -> [String] -> String forall a. [a] -> [[a]] -> [a] intercalate ", " ([String] -> String) -> [String] -> String forall a b. (a -> b) -> a -> b $ (Column -> String) -> [Column] -> [String] forall a b. (a -> b) -> [a] -> [b] map Column -> String appendSet [Column] cols whereSql :: String whereSql = if String -> Bool forall (t :: * -> *) a. Foldable t => t a -> Bool null String partSql then "" else " WHERE " String -> ShowS forall a. [a] -> [a] -> [a] ++ String partSql sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ [String] -> String forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat [ "UPDATE ", TableName -> String getTableName TableName tn , " SET ", String setSql , String whereSql ] appendSet :: Column -> String appendSet :: Column -> String appendSet (Column col :: String col) | '=' Char -> String -> Bool forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool `elem` String col = String col | Bool otherwise = String col String -> ShowS forall a. [a] -> [a] -> [a] ++ " = ?" selectOne :: (ToRow a, FromRow b, Show b) => TableName -> Columns -> String -> a -> Connection -> IO (Maybe b) selectOne :: TableName -> [Column] -> String -> a -> Connection -> IO (Maybe b) selectOne tn :: TableName tn cols :: [Column] cols partSql :: String partSql a :: a a conn :: Connection conn = [b] -> Maybe b forall a. [a] -> Maybe a listToMaybe ([b] -> Maybe b) -> IO [b] -> IO (Maybe b) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Connection -> Query -> a -> IO [b] forall q r. (ToRow q, FromRow r) => Connection -> Query -> q -> IO [r] query Connection conn Query sql a a where whereSql :: String whereSql = " WHERE " String -> ShowS forall a. [a] -> [a] -> [a] ++ String partSql sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ [String] -> String forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat [ "SELECT ", [Column] -> String columnsToString [Column] cols, " FROM ", TableName -> String getTableName TableName tn , String whereSql ] selectOneOnly :: (ToRow a, FromRow (Only b), Show b) => TableName -> Column -> String -> a -> Pool Connection -> IO (Maybe b) selectOneOnly :: TableName -> Column -> String -> a -> Pool Connection -> IO (Maybe b) selectOneOnly tn :: TableName tn col :: Column col partSql :: String partSql a :: a a pool :: Pool Connection pool = Pool Connection -> (Connection -> IO (Maybe b)) -> IO (Maybe b) forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO (Maybe b)) -> IO (Maybe b)) -> (Connection -> IO (Maybe b)) -> IO (Maybe b) forall a b. (a -> b) -> a -> b $ (Maybe (Only b) -> Maybe b) -> IO (Maybe (Only b)) -> IO (Maybe b) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap ((Only b -> b) -> Maybe (Only b) -> Maybe b forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap Only b -> b forall a. Only a -> a fromOnly) (IO (Maybe (Only b)) -> IO (Maybe b)) -> (Connection -> IO (Maybe (Only b))) -> Connection -> IO (Maybe b) forall b c a. (b -> c) -> (a -> b) -> a -> c . TableName -> [Column] -> String -> a -> Connection -> IO (Maybe (Only b)) forall a b. (ToRow a, FromRow b, Show b) => TableName -> [Column] -> String -> a -> Connection -> IO (Maybe b) selectOne TableName tn [Column col] String partSql a a count :: ToRow a => TableName -> String -> a -> Pool Connection -> IO Int64 count :: TableName -> String -> a -> Pool Connection -> IO Int64 count tn :: TableName tn partSql :: String partSql a :: a a pool :: Pool Connection pool = Pool Connection -> (Connection -> IO Int64) -> IO Int64 forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO Int64) -> IO Int64) -> (Connection -> IO Int64) -> IO Int64 forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Int64 -> [Only Int64] -> Int64 forall a. FromRow (Only a) => a -> [Only a] -> a getOnlyDefault 0 ([Only Int64] -> Int64) -> IO [Only Int64] -> IO Int64 forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Connection -> Query -> a -> IO [Only Int64] forall q r. (ToRow q, FromRow r) => Connection -> Query -> q -> IO [r] query Connection conn Query sql a a where whereSql :: String whereSql = " WHERE " String -> ShowS forall a. [a] -> [a] -> [a] ++ String partSql sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ [String] -> String forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat [ "SELECT count(*) FROM ", TableName -> String getTableName TableName tn, String whereSql ] delete :: ToRow a => TableName -> String -> a -> Pool Connection -> IO Int64 delete :: TableName -> String -> a -> Pool Connection -> IO Int64 delete tn :: TableName tn partSql :: String partSql a :: a a pool :: Pool Connection pool = Pool Connection -> (Connection -> IO Int64) -> IO Int64 forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO Int64) -> IO Int64) -> (Connection -> IO Int64) -> IO Int64 forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Connection -> Query -> a -> IO Int64 forall q. ToRow q => Connection -> Query -> q -> IO Int64 execute Connection conn Query sql a a where whereSql :: String whereSql = " WHERE " String -> ShowS forall a. [a] -> [a] -> [a] ++ String partSql sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ [String] -> String forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat [ "DELETE FROM ", TableName -> String getTableName TableName tn, String whereSql ] configs :: TableName configs :: TableName configs = "configs" jobs :: TableName jobs :: TableName jobs = "jobs" funcs :: TableName funcs :: TableName funcs = "funcs" createConfigTable :: Connection -> IO () createConfigTable :: Connection -> IO () createConfigTable = IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> (Connection -> IO Int64) -> Connection -> IO () forall b c a. (b -> c) -> (a -> b) -> a -> c . TableName -> [Column] -> Connection -> IO Int64 createTable TableName configs [ "name VARCHAR(256) NOT NULL" , "value INT DEFAULT 0" , "CONSTRAINT config_pk PRIMARY KEY (name)" ] createJobTable :: Connection -> IO () createJobTable :: Connection -> IO () createJobTable = IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> (Connection -> IO Int64) -> Connection -> IO () forall b c a. (b -> c) -> (a -> b) -> a -> c . TableName -> [Column] -> Connection -> IO Int64 createTable TableName jobs [ "func VARCHAR(256) NOT NULL" , "name VARCHAR(256) NOT NULL" , "value text" , "state INT DEFAULT 0" , "sched_at INT DEFAULT 0" , "CONSTRAINT job_pk PRIMARY KEY (func, name)" ] createFuncTable :: Connection -> IO () createFuncTable :: Connection -> IO () createFuncTable = IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> (Connection -> IO Int64) -> Connection -> IO () forall b c a. (b -> c) -> (a -> b) -> a -> c . TableName -> [Column] -> Connection -> IO Int64 createTable TableName funcs [ "func VARCHAR(256) NOT NULL" , "CONSTRAINT func_pk PRIMARY KEY (func)" ] allPending :: Pool Connection -> IO () allPending :: Pool Connection -> IO () allPending = IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> (Pool Connection -> IO Int64) -> Pool Connection -> IO () forall b c a. (b -> c) -> (a -> b) -> a -> c . TableName -> [Column] -> String -> Only ByteString -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> [Column] -> String -> a -> Pool Connection -> IO Int64 update TableName jobs ["state"] "" (ByteString -> Only ByteString forall a. a -> Only a Only (State -> ByteString stateName State Pending)) doLookup :: Pool Connection -> State -> FuncName -> JobName -> IO (Maybe Job) doLookup :: Pool Connection -> State -> FuncName -> JobName -> IO (Maybe Job) doLookup pool :: Pool Connection pool state :: State state fn :: FuncName fn jn :: JobName jn = do Maybe ByteString r <- TableName -> Column -> String -> (ByteString, ByteString, ByteString) -> Pool Connection -> IO (Maybe ByteString) forall a b. (ToRow a, FromRow (Only b), Show b) => TableName -> Column -> String -> a -> Pool Connection -> IO (Maybe b) selectOneOnly TableName jobs "value" "func=? AND name=? AND state=?" (FuncName -> ByteString unFN FuncName fn, JobName -> ByteString unJN JobName jn, State -> ByteString stateName State state) Pool Connection pool case Maybe ByteString r of Nothing -> Maybe Job -> IO (Maybe Job) forall (m :: * -> *) a. Monad m => a -> m a return Maybe Job forall a. Maybe a Nothing Just bs :: ByteString bs -> case ByteString -> Either String Job decodeJob ByteString bs of Left e :: String e -> do String -> String -> IO () errorM "Periodic.Server.Persist.PSQL" (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ "doLookup error: decode " String -> ShowS forall a. [a] -> [a] -> [a] ++ ByteString -> String forall a. Show a => a -> String show ByteString bs String -> ShowS forall a. [a] -> [a] -> [a] ++ " " String -> ShowS forall a. [a] -> [a] -> [a] ++ ShowS forall a. Show a => a -> String show String e Maybe Job -> IO (Maybe Job) forall (m :: * -> *) a. Monad m => a -> m a return Maybe Job forall a. Maybe a Nothing Right job :: Job job -> Maybe Job -> IO (Maybe Job) forall (m :: * -> *) a. Monad m => a -> m a return (Maybe Job -> IO (Maybe Job)) -> Maybe Job -> IO (Maybe Job) forall a b. (a -> b) -> a -> b $ Job -> Maybe Job forall a. a -> Maybe a Just Job job doMember :: Pool Connection -> State -> FuncName -> JobName -> IO Bool doMember :: Pool Connection -> State -> FuncName -> JobName -> IO Bool doMember pool :: Pool Connection pool st :: State st fn :: FuncName fn jn :: JobName jn = Maybe Job -> Bool forall a. Maybe a -> Bool isJust (Maybe Job -> Bool) -> IO (Maybe Job) -> IO Bool forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Pool Connection -> State -> FuncName -> JobName -> IO (Maybe Job) doLookup Pool Connection pool State st FuncName fn JobName jn doInsert :: Pool Connection -> State -> FuncName -> JobName -> Job -> IO () doInsert :: Pool Connection -> State -> FuncName -> JobName -> Job -> IO () doInsert pool :: Pool Connection pool state :: State state fn :: FuncName fn jn :: JobName jn job :: Job job = do Pool Connection -> FuncName -> IO () doInsertFuncName Pool Connection pool FuncName fn IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> IO Int64 -> IO () forall a b. (a -> b) -> a -> b $ TableName -> [Column] -> [Column] -> (ByteString, ByteString, ByteString, ByteString, Int64) -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> [Column] -> [Column] -> a -> Pool Connection -> IO Int64 insertOrUpdate TableName jobs ["func", "name"] ["value", "state", "sched_at"] (FuncName -> ByteString unFN FuncName fn, JobName -> ByteString unJN JobName jn, ByteString -> ByteString encode (ByteString -> ByteString) -> ByteString -> ByteString forall a b. (a -> b) -> a -> b $ Job -> ByteString forall a. Byteable a => a -> ByteString toBytes Job job, State -> ByteString stateName State state, Job -> Int64 getSchedAt Job job) Pool Connection pool doInsertFuncName :: Pool Connection -> FuncName -> IO () doInsertFuncName :: Pool Connection -> FuncName -> IO () doInsertFuncName pool :: Pool Connection pool fn :: FuncName fn = IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> IO Int64 -> IO () forall a b. (a -> b) -> a -> b $ TableName -> [Column] -> [Column] -> Only ByteString -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> [Column] -> [Column] -> a -> Pool Connection -> IO Int64 insertOrUpdate TableName funcs ["func"] [] (ByteString -> Only ByteString forall a. a -> Only a Only (ByteString -> Only ByteString) -> ByteString -> Only ByteString forall a b. (a -> b) -> a -> b $ FuncName -> ByteString unFN FuncName fn) Pool Connection pool doFoldr :: Pool Connection -> State -> (Job -> a -> a) -> a -> IO a doFoldr :: Pool Connection -> State -> (Job -> a -> a) -> a -> IO a doFoldr pool :: Pool Connection pool state :: State state f :: Job -> a -> a f acc :: a acc = Pool Connection -> (Connection -> IO a) -> IO a forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Connection -> Query -> Only ByteString -> a -> (a -> Only ByteString -> IO a) -> IO a forall row params a. (FromRow row, ToRow params) => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a fold Connection conn Query sql (ByteString -> Only ByteString forall a. a -> Only a Only (ByteString -> Only ByteString) -> ByteString -> Only ByteString forall a b. (a -> b) -> a -> b $ State -> ByteString stateName State state) a acc ((Job -> a -> a) -> a -> Only ByteString -> IO a forall a. (Job -> a -> a) -> a -> Only ByteString -> IO a mkFoldFunc Job -> a -> a f) where sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ "SELECT value FROM " String -> ShowS forall a. [a] -> [a] -> [a] ++ TableName -> String getTableName TableName jobs String -> ShowS forall a. [a] -> [a] -> [a] ++ " WHERE state=?" doFoldrPending :: Pool Connection -> Int64 -> [FuncName] -> (Job -> a -> a) -> a -> IO a doFoldrPending :: Pool Connection -> Int64 -> [FuncName] -> (Job -> a -> a) -> a -> IO a doFoldrPending pool :: Pool Connection pool ts :: Int64 ts fns :: [FuncName] fns f :: Job -> a -> a f acc :: a acc = Pool Connection -> (Connection -> IO a) -> IO a forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> (FuncName -> a -> IO a) -> a -> [FuncName] -> IO a forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> b -> m b) -> b -> t a -> m b F.foldrM (Connection -> (Job -> a -> a) -> FuncName -> a -> IO a forall a. Connection -> (Job -> a -> a) -> FuncName -> a -> IO a foldFunc Connection conn Job -> a -> a f) a acc [FuncName] fns where sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ "SELECT value FROM " String -> ShowS forall a. [a] -> [a] -> [a] ++ TableName -> String getTableName TableName jobs String -> ShowS forall a. [a] -> [a] -> [a] ++ " WHERE func=? AND state=? AND sched_at < ?" foldFunc :: Connection -> (Job -> a -> a) -> FuncName -> a -> IO a foldFunc :: Connection -> (Job -> a -> a) -> FuncName -> a -> IO a foldFunc conn :: Connection conn f0 :: Job -> a -> a f0 fn :: FuncName fn acc0 :: a acc0 = Connection -> Query -> (ByteString, ByteString, Int64) -> a -> (a -> Only ByteString -> IO a) -> IO a forall row params a. (FromRow row, ToRow params) => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a fold Connection conn Query sql (FuncName -> ByteString unFN FuncName fn, State -> ByteString stateName State Pending, Int64 ts) a acc0 ((Job -> a -> a) -> a -> Only ByteString -> IO a forall a. (Job -> a -> a) -> a -> Only ByteString -> IO a mkFoldFunc Job -> a -> a f0) doFoldrLocking :: Pool Connection -> Int -> FuncName -> (Job -> a -> a) -> a -> IO a doFoldrLocking :: Pool Connection -> Int -> FuncName -> (Job -> a -> a) -> a -> IO a doFoldrLocking pool :: Pool Connection pool limit :: Int limit fn :: FuncName fn f :: Job -> a -> a f acc :: a acc = Pool Connection -> (Connection -> IO a) -> IO a forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Connection -> Query -> (ByteString, ByteString, Int) -> a -> (a -> Only ByteString -> IO a) -> IO a forall row params a. (FromRow row, ToRow params) => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a fold Connection conn Query sql (FuncName -> ByteString unFN FuncName fn, State -> ByteString stateName State Locking, Int limit) a acc ((Job -> a -> a) -> a -> Only ByteString -> IO a forall a. (Job -> a -> a) -> a -> Only ByteString -> IO a mkFoldFunc Job -> a -> a f) where sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ "SELECT value FROM " String -> ShowS forall a. [a] -> [a] -> [a] ++ TableName -> String getTableName TableName jobs String -> ShowS forall a. [a] -> [a] -> [a] ++ " WHERE func=? AND state=? ORDER BY sched_at ASC LIMIT ?" doDumpJob :: Pool Connection -> IO [Job] doDumpJob :: Pool Connection -> IO [Job] doDumpJob pool :: Pool Connection pool = Pool Connection -> (Connection -> IO [Job]) -> IO [Job] forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO [Job]) -> IO [Job]) -> (Connection -> IO [Job]) -> IO [Job] forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> Connection -> Query -> [Job] -> ([Job] -> Only ByteString -> IO [Job]) -> IO [Job] forall r a. FromRow r => Connection -> Query -> a -> (a -> r -> IO a) -> IO a fold_ Connection conn Query sql [] ((Job -> [Job] -> [Job]) -> [Job] -> Only ByteString -> IO [Job] forall a. (Job -> a -> a) -> a -> Only ByteString -> IO a mkFoldFunc (:)) where sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ "SELECT value FROM " String -> ShowS forall a. [a] -> [a] -> [a] ++ TableName -> String getTableName TableName jobs doFuncList :: Pool Connection -> IO [FuncName] doFuncList :: Pool Connection -> IO [FuncName] doFuncList pool :: Pool Connection pool = Pool Connection -> (Connection -> IO [FuncName]) -> IO [FuncName] forall (m :: * -> *) a b. MonadBaseControl IO m => Pool a -> (a -> m b) -> m b withResource Pool Connection pool ((Connection -> IO [FuncName]) -> IO [FuncName]) -> (Connection -> IO [FuncName]) -> IO [FuncName] forall a b. (a -> b) -> a -> b $ \conn :: Connection conn -> (Only ByteString -> FuncName) -> [Only ByteString] -> [FuncName] forall a b. (a -> b) -> [a] -> [b] map (ByteString -> FuncName FuncName (ByteString -> FuncName) -> (Only ByteString -> ByteString) -> Only ByteString -> FuncName forall b c a. (b -> c) -> (a -> b) -> a -> c . Only ByteString -> ByteString forall a. Only a -> a fromOnly) ([Only ByteString] -> [FuncName]) -> IO [Only ByteString] -> IO [FuncName] forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Connection -> Query -> IO [Only ByteString] forall r. FromRow r => Connection -> Query -> IO [r] query_ Connection conn Query sql where sql :: Query sql = String -> Query forall a. IsString a => String -> a fromString (String -> Query) -> String -> Query forall a b. (a -> b) -> a -> b $ "SELECT func FROM " String -> ShowS forall a. [a] -> [a] -> [a] ++ TableName -> String getTableName TableName funcs doDelete :: Pool Connection -> FuncName -> JobName -> IO () doDelete :: Pool Connection -> FuncName -> JobName -> IO () doDelete pool :: Pool Connection pool fn :: FuncName fn jn :: JobName jn = IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> IO Int64 -> IO () forall a b. (a -> b) -> a -> b $ TableName -> String -> (ByteString, ByteString) -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> String -> a -> Pool Connection -> IO Int64 delete TableName jobs "func=? AND name=?" (FuncName -> ByteString unFN FuncName fn, JobName -> ByteString unJN JobName jn) Pool Connection pool doRemoveFuncName :: Pool Connection -> FuncName -> IO () doRemoveFuncName :: Pool Connection -> FuncName -> IO () doRemoveFuncName pool :: Pool Connection pool fn :: FuncName fn = do IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> IO Int64 -> IO () forall a b. (a -> b) -> a -> b $ TableName -> String -> Only ByteString -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> String -> a -> Pool Connection -> IO Int64 delete TableName jobs "func=?" (ByteString -> Only ByteString forall a. a -> Only a Only (ByteString -> Only ByteString) -> ByteString -> Only ByteString forall a b. (a -> b) -> a -> b $ FuncName -> ByteString unFN FuncName fn) Pool Connection pool IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> IO Int64 -> IO () forall a b. (a -> b) -> a -> b $ TableName -> String -> Only ByteString -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> String -> a -> Pool Connection -> IO Int64 delete TableName funcs "func=?" (ByteString -> Only ByteString forall a. a -> Only a Only (ByteString -> Only ByteString) -> ByteString -> Only ByteString forall a b. (a -> b) -> a -> b $ FuncName -> ByteString unFN FuncName fn) Pool Connection pool doMinSchedAt :: Pool Connection -> State -> FuncName -> IO Int64 doMinSchedAt :: Pool Connection -> State -> FuncName -> IO Int64 doMinSchedAt pool :: Pool Connection pool state :: State state fn :: FuncName fn = Int64 -> Maybe Int64 -> Int64 forall a. a -> Maybe a -> a fromMaybe 0 (Maybe Int64 -> Int64) -> (Maybe (Maybe Int64) -> Maybe Int64) -> Maybe (Maybe Int64) -> Int64 forall b c a. (b -> c) -> (a -> b) -> a -> c . Maybe Int64 -> Maybe (Maybe Int64) -> Maybe Int64 forall a. a -> Maybe a -> a fromMaybe Maybe Int64 forall a. Maybe a Nothing (Maybe (Maybe Int64) -> Int64) -> IO (Maybe (Maybe Int64)) -> IO Int64 forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> TableName -> Column -> String -> (ByteString, ByteString) -> Pool Connection -> IO (Maybe (Maybe Int64)) forall a b. (ToRow a, FromRow (Only b), Show b) => TableName -> Column -> String -> a -> Pool Connection -> IO (Maybe b) selectOneOnly TableName jobs "min(sched_at)" "func=? AND state=?" (FuncName -> ByteString unFN FuncName fn, State -> ByteString stateName State state) Pool Connection pool doSize :: Pool Connection -> State -> FuncName -> IO Int64 doSize :: Pool Connection -> State -> FuncName -> IO Int64 doSize pool :: Pool Connection pool state :: State state fn :: FuncName fn = TableName -> String -> (ByteString, ByteString) -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> String -> a -> Pool Connection -> IO Int64 count TableName jobs "func=? AND state=?" (FuncName -> ByteString unFN FuncName fn, State -> ByteString stateName State state) Pool Connection pool doConfigSet :: Pool Connection -> String -> Int -> IO () doConfigSet :: Pool Connection -> String -> Int -> IO () doConfigSet pool :: Pool Connection pool name :: String name v :: Int v = IO Int64 -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (IO Int64 -> IO ()) -> IO Int64 -> IO () forall a b. (a -> b) -> a -> b $ TableName -> [Column] -> [Column] -> (String, Int) -> Pool Connection -> IO Int64 forall a. ToRow a => TableName -> [Column] -> [Column] -> a -> Pool Connection -> IO Int64 insertOrUpdate TableName configs ["name"] ["value"] (String name, Int v) Pool Connection pool doConfigGet :: Pool Connection -> String -> IO (Maybe Int) doConfigGet :: Pool Connection -> String -> IO (Maybe Int) doConfigGet pool :: Pool Connection pool name :: String name = TableName -> Column -> String -> Only String -> Pool Connection -> IO (Maybe Int) forall a b. (ToRow a, FromRow (Only b), Show b) => TableName -> Column -> String -> a -> Pool Connection -> IO (Maybe b) selectOneOnly TableName configs "value" "name=?" (String -> Only String forall a. a -> Only a Only String name) Pool Connection pool mkFoldFunc :: (Job -> a -> a) -> a -> Only ByteString -> IO a mkFoldFunc :: (Job -> a -> a) -> a -> Only ByteString -> IO a mkFoldFunc f :: Job -> a -> a f acc :: a acc (Only bs :: ByteString bs) = case ByteString -> Either String Job decodeJob ByteString bs of Left e :: String e -> do String -> String -> IO () errorM "Periodic.Server.Persist.PSQL" (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ "mkFoldFunc error: decode " String -> ShowS forall a. [a] -> [a] -> [a] ++ ByteString -> String forall a. Show a => a -> String show ByteString bs String -> ShowS forall a. [a] -> [a] -> [a] ++ " " String -> ShowS forall a. [a] -> [a] -> [a] ++ ShowS forall a. Show a => a -> String show String e a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return a acc Right job :: Job job -> a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return (a -> IO a) -> a -> IO a forall a b. (a -> b) -> a -> b $ Job -> a -> a f Job job a acc decodeJob :: ByteString -> Either String Job decodeJob :: ByteString -> Either String Job decodeJob bs :: ByteString bs = case ByteString -> Either String ByteString decode ByteString bs of Left e :: String e -> String -> Either String Job forall a b. a -> Either a b Left String e Right bs0 :: ByteString bs0 -> case ByteString -> Either (ByteString, Int64, String) (ByteString, Int64, Job) forall a. Binary a => ByteString -> Either (ByteString, Int64, String) (ByteString, Int64, a) decodeOrFail (ByteString -> ByteString fromStrict ByteString bs0) of Left e :: (ByteString, Int64, String) e -> String -> Either String Job forall a b. a -> Either a b Left (String -> Either String Job) -> String -> Either String Job forall a b. (a -> b) -> a -> b $ (ByteString, Int64, String) -> String forall a. Show a => a -> String show (ByteString, Int64, String) e Right (_, _, job :: Job job) -> Job -> Either String Job forall a b. b -> Either a b Right Job job