{-| Implementation of an SQLite3-based event store. -} module Data.CQRS.EventStore.Backend.Sqlite3 ( SQLite3EventStoreBackend , createBackendPool ) where import Control.Monad (when, forM_) import Data.ByteString (ByteString) import Data.Conduit (ResourceT, Source, ($=), ($$), runResourceT) import qualified Data.Conduit.List as CL import Data.Conduit.Pool (Pool, createPool) import Data.CQRS.EventStore.Backend (EventStoreBackend(..), RawEvent, RawSnapshot(..)) import Data.CQRS.EventStore.Backend.Sqlite3Utils (withTransaction, execSql, sourceQuery) import Data.CQRS.GUID (GUID) import qualified Data.CQRS.GUID as G import Data.CQRS.PersistedEvent (PersistedEvent(..)) import Data.Text (Text) import qualified Database.SQLite3 as SQL import Database.SQLite3 (Database, SQLData(..)) -- Convenience class for converting values to SQLData. class ToSQLData a where toSQLData :: a -> SQLData instance ToSQLData GUID where toSQLData = SQLBlob . G.toByteString instance ToSQLData Int where toSQLData = SQLInteger . fromIntegral instance ToSQLData ByteString where toSQLData = SQLBlob -- SQL createEventsSql :: Text createEventsSql = "CREATE TABLE IF NOT EXISTS events ( guid BLOB , ev_data BLOB , version INTEGER , PRIMARY KEY (guid, version) );" selectEventsSql :: Text selectEventsSql = "SELECT version, ev_data FROM events WHERE guid = ? AND version >= ? ORDER BY version ASC;" enumerateAllEventsSql :: Text enumerateAllEventsSql = "SELECT guid, version, ev_data FROM events ORDER BY version ASC;" insertEventSql :: Text insertEventSql = "INSERT INTO events ( guid, version, ev_data ) VALUES (?, ?, ?);" createAggregateVersionsSql :: Text createAggregateVersionsSql = "CREATE TABLE IF NOT EXISTS versions ( guid BLOB PRIMARY KEY , version INTEGER );" getCurrentVersionSql :: Text getCurrentVersionSql = "SELECT version FROM versions WHERE guid = ?;" updateCurrentVersionSql :: Text updateCurrentVersionSql = "INSERT OR REPLACE INTO versions ( guid, version ) VALUES (?,?);" createSnapshotSql :: Text createSnapshotSql = "CREATE TABLE IF NOT EXISTS snapshots ( guid BLOB PRIMARY KEY , data BLOB , version INTEGER );" badQueryResultMsg :: [String] -> [SQLData] -> String badQueryResultMsg params columns = concat ["Invalid query result shape. Params: ", show params, ". Result columns: ", show columns] versionConflict :: (Show a, Show b) => a -> b -> IO c versionConflict ov cv = fail $ concat [ "Version conflict detected (expected ", show ov , ", saw ", show cv, ")" ] storeEvents :: Database -> GUID -> Int -> [RawEvent] -> IO () storeEvents database guid originatingVersion events = do -- Column unpacking. let unpackColumns [ SQLInteger v ] = v unpackColumns columns = error $ badQueryResultMsg [show guid] columns -- Get the current version number of the aggregate. curVer <- runResourceT $ (sourceQuery database getCurrentVersionSql [toSQLData guid]) $$ CL.fold (\x -> max x . unpackColumns) 0 -- Sanity check current version number. when (fromIntegral curVer /= originatingVersion) $ versionConflict originatingVersion curVer -- Update de-normalized version number. execSql database updateCurrentVersionSql [ toSQLData guid , toSQLData $ originatingVersion + length events ] -- Store the supplied events. forM_ events $ \e -> do execSql database insertEventSql [ toSQLData guid , toSQLData $ peSequenceNumber e , toSQLData $ peEvent e ] retrieveEvents :: Database -> GUID -> Int -> Source (ResourceT IO) RawEvent retrieveEvents database guid v0 = do -- Unpack the columns into tuples. let unpackColumns [SQLInteger version, SQLBlob eventData] = PersistedEvent guid eventData (fromIntegral version) unpackColumns columns = error $ badQueryResultMsg [show guid, show v0] columns -- Find events with version numbers. sourceQuery database selectEventsSql [toSQLData guid, toSQLData v0] $= CL.map unpackColumns enumerateAllEvents :: Database -> Source (ResourceT IO) RawEvent enumerateAllEvents database = do let unpackColumns [ SQLBlob g, SQLInteger v, SQLBlob ed ] = PersistedEvent (G.fromByteString g) ed (fromIntegral v) unpackColumns columns = error $ badQueryResultMsg [] columns sourceQuery database enumerateAllEventsSql [] $= CL.map unpackColumns writeSnapshot :: Database -> GUID -> RawSnapshot -> IO () writeSnapshot database guid (RawSnapshot v d) = do execSql database "INSERT OR REPLACE INTO snapshots ( guid , data, version ) VALUES ( ?, ?, ? );" [ toSQLData guid , toSQLData d , toSQLData v ] getLatestSnapshot :: Database -> GUID -> IO (Maybe RawSnapshot) getLatestSnapshot database guid = do -- Unpack columns from result. let unpackColumns [SQLBlob d, SQLInteger v] = Just $ RawSnapshot (fromIntegral v) d unpackColumns columns = error $ badQueryResultMsg [show guid] columns -- Run the query. runResourceT $ (sourceQuery database selectSnapshotSql [toSQLData guid] $= (CL.map unpackColumns)) $$ CL.fold (\_ x -> x) Nothing where selectSnapshotSql = "SELECT data, version FROM snapshots WHERE guid = ?;" -- | SQLite3 event store backend type alias. newtype SQLite3EventStoreBackend = ESB Database -- | Instance of EventStoreBackend for SQLite3EventStoreBackend instance EventStoreBackend SQLite3EventStoreBackend where esbStoreEvents (ESB database) = storeEvents database esbRetrieveEvents (ESB database) = retrieveEvents database esbEnumerateAllEvents (ESB database) = enumerateAllEvents database esbWriteSnapshot (ESB database) = writeSnapshot database esbGetLatestSnapshot (ESB database) = getLatestSnapshot database esbWithTransaction (ESB database) = withTransaction database -- | Create a pool of SQLite3-based event store backends. createBackendPool :: Int -> Text -> IO (Pool SQLite3EventStoreBackend) createBackendPool n databaseFileName = do createPool open close 1 1 n where open = do -- Create the database. database <- SQL.open databaseFileName -- Set up tables if necessary. execSql database createEventsSql [] execSql database createAggregateVersionsSql [] execSql database createSnapshotSql [] -- Return event store. return $ ESB database close (ESB db) = do SQL.close db