module Network.JobQueue.Backend.Sqlite3 (openSqlite3Backend, newSqlite3Backend) where
import qualified Data.ByteString.Char8 as BS
import Database.HDBC
import Database.HDBC.Sqlite3
import Network.JobQueue.Backend.Types
import Network.JobQueue.Backend.Class
data Sqlite3Queue = Sqlite3Queue Connection String
instance BackendQueue Sqlite3Queue where
readQueue = readDBQueue
peekQueue = peekDBQueue
updateQueue = updateDBQueue
deleteQueue = deleteDBQueue
writeQueue = writeDBQueue
listQueue = listDBQueue
itemsQueue = itemsDBQueue
countQueue = countDBQueue
openSqlite3Backend :: String -> IO Backend
openSqlite3Backend filePath = do
conn <- connectSqlite3 filePath
return $ Backend {
bOpenQueue = \queueName -> do
_ <- run conn ("CREATE TABLE IF NOT EXISTS '" ++ queueName ++ "' (key INTEGER PRIMARY KEY AUTOINCREMENT, prio INTEGER, value TEXT, version INTEGER)") []
return (Sqlite3Queue conn queueName)
, bClose = disconnect conn
}
newSqlite3Backend :: Connection -> Backend
newSqlite3Backend conn = Backend {
bOpenQueue = \queueName -> return (Sqlite3Queue conn queueName)
, bClose = return ()
}
readDBQueue :: Sqlite3Queue -> IO (Maybe (BS.ByteString, String))
readDBQueue (Sqlite3Queue conn queueName) = withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery conn' ("SELECT key, value FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") []
case sqlvalues of
((key:value:_):_) -> do
_ <- run conn' ("DELETE FROM '" ++ queueName ++ "' WHERE key = ?") [toSql key]
return (Just (fromSql value, fromSql key))
_ -> return (Nothing)
peekDBQueue :: Sqlite3Queue -> IO (Maybe (BS.ByteString, String, String, Int))
peekDBQueue (Sqlite3Queue conn queueName) = withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery conn' ("SELECT key, value, version FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") []
case sqlvalues of
((key:value:version:_):_) -> return (Just (fromSql value, fromSql key, fromSql key, fromSql version))
_ -> return (Nothing)
writeDBQueue :: Sqlite3Queue -> BS.ByteString -> Int -> IO (String)
writeDBQueue (Sqlite3Queue conn queueName) value prio = do
withTransaction conn $ \conn' -> do
_ <- run conn' ("INSERT INTO '" ++ queueName ++ "'(prio, value, version) VALUES (?,?,0)") [toSql prio, toSql value]
sqlvalues <- quickQuery conn' ("SELECT seq FROM sqlite_sequence where name = '" ++ queueName ++ "'") []
case sqlvalues of
((key:_):_) -> do
return (fromSql key)
_ -> return ("")
deleteDBQueue :: Sqlite3Queue -> String -> IO (Bool)
deleteDBQueue (Sqlite3Queue conn queueName) key = withTransaction conn $ \conn' -> do
_ <- run conn' ("DELETE FROM '" ++ queueName ++ "' WHERE key = ?") [toSql key]
return (True)
updateDBQueue :: Sqlite3Queue -> String -> BS.ByteString -> Int -> IO (Bool)
updateDBQueue (Sqlite3Queue conn queueName) key value version = do
withTransaction conn $ \conn' -> do
_ <- run conn' ("UPDATE '" ++ queueName ++ "' SET value = ?, version = ? WHERE key = ? AND version = ?") [toSql value, toSql (version+1), toSql key, toSql version]
sqlvalues <- quickQuery conn' ("SELECT key FROM '" ++ queueName ++ "' WHERE key = ? AND version = ? ORDER BY prio, key LIMIT 1") [toSql key, toSql (version+1)]
case sqlvalues of
[] -> return (False)
_ -> return (True)
countDBQueue :: Sqlite3Queue -> IO (Int)
countDBQueue (Sqlite3Queue conn queueName) = withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery conn' ("SELECT COUNT (*) FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") []
case sqlvalues of
((count:_):_) -> return (fromSql count)
_ -> return (0)
itemsDBQueue :: Sqlite3Queue -> IO ([String])
itemsDBQueue (Sqlite3Queue conn queueName) = withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery conn' ("SELECT key FROM '" ++ queueName ++ "' ORDER BY prio, key") []
case sqlvalues of
keys -> return (map (fromSql . head) keys)
listDBQueue :: Sqlite3Queue -> IO ([BS.ByteString])
listDBQueue (Sqlite3Queue conn queueName) = withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery conn' ("SELECT value FROM '" ++ queueName ++ "' ORDER BY prio, key") []
case sqlvalues of
keys -> return (map (fromSql . head) keys)