module Avers.Storage where
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad.Random (getRandomR, evalRandIO)
import Control.Monad.State
import Control.Monad.Except
import Data.Pool
import Data.Char
import Data.Monoid
import Data.Maybe
import qualified Data.ByteString.Lazy as BL
import Data.Vector (Vector)
import qualified Data.Vector as V
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Map as M
import Data.Time
import Data.List (find)
import Data.Aeson (Value)
import Data.Aeson.Types (emptyObject)
import Data.ByteArray.Encoding (Base(Base16), convertToBase)
import Crypto.Hash (Digest, SHA3_256, hashlazy)
import qualified Database.RethinkDB as R
import Crypto.Scrypt
import Avers.Metrics
import Avers.Metrics.Measurements
import Avers.Types
import Avers.Patching
import Avers.TH
import Avers.Views
import Avers.Index
import Avers.Storage.Backend
import Avers.Storage.Expressions
import Prelude
requireResult :: AversError -> Maybe a -> Avers a
requireResult err Nothing = throwError err
requireResult _ (Just v) = return v
exists :: ObjId -> Avers Bool
exists objId = measureDuration M_avers_storage_exists_duration $
existsDocument objectsTable (BaseObjectId objId)
lookupObject :: ObjId -> Avers Object
lookupObject objId = measureDuration M_avers_storage_lookupObject_duration $ do
mbObject <- lookupDocument objectsTable (BaseObjectId objId)
requireResult (ObjectNotFound objId) mbObject
createObject :: (ToJSON a) => ObjectType a -> ObjId -> a -> Avers ObjId
createObject ot@ObjectType{..} createdBy content = do
objId <- otId
now <- liftIO $ getCurrentTime
createObject' objId now ot createdBy content
return objId
createObject' :: (ToJSON a) => ObjId -> UTCTime -> ObjectType a -> ObjId -> a -> Avers ()
createObject' objId now ot@ObjectType{..} createdBy content = do
insertDocument objectsTable object
insertDocument patchesTable patch
updateObjectViews ot objId (Just content)
where
object = Object objId otType now createdBy Nothing
boId = BaseObjectId objId
op = Set rootPath (Just $ toJSON content)
patch = Patch boId zeroRevId objId now op
deleteObject :: ObjId -> Avers ()
deleteObject objId = do
obj <- lookupObject objId
upsertDocument objectsTable (obj { objectDeleted = Just True })
SomeObjectType objType <- lookupObjectType (objectType obj)
updateObjectViews objType objId Nothing
pruneObject :: ObjId -> Avers ()
pruneObject objId = do
obj <- lookupObject objId
SomeObjectType objType <- lookupObjectType (objectType obj)
unless (objectDeleted obj == Just True) $
strErr $ "pruneObject: object " ++ show objId ++ " is not deleted"
void $ runQuery $ R.Delete $
objectPatchSequenceE (BaseObjectId objId) 0 maxBound
void $ runQuery $ R.Delete $
objectSnapshotSequenceE (BaseObjectId objId) 0 maxBound
forM_ (otViews objType) $ \(SomeView view) -> do
deleteDocument (viewTable view) objId
void $ deleteDocument objectsTable objId
createCheckpoint :: ObjectId -> ObjId -> Avers RevId
createCheckpoint objId createdBy = do
now <- liftIO $ getCurrentTime
Snapshot{..} <- lookupLatestSnapshot objId
let op = Set rootPath (Just $ toJSON snapshotContent)
revId = succ snapshotRevisionId
savePatch $ Patch objId revId createdBy now op
saveSnapshot $ Snapshot snapshotObjectId revId snapshotContent
return revId
isCheckpointPatch :: Patch -> Bool
isCheckpointPatch Patch{..} = case patchOperation of
Set{..} -> opPath == rootPath
_ -> False
latestCheckpointPatch :: ObjectId -> Avers (Maybe Patch)
latestCheckpointPatch objId = do
patches <- objectPatches objId
return $ find isCheckpointPatch (reverse patches)
vacuumObject :: ObjectId -> Avers ()
vacuumObject objId = do
mbPatch <- latestCheckpointPatch objId
case mbPatch of
Nothing -> return ()
Just Patch{..} -> do
let revId = unRevId patchRevisionId 1
void $ runQuery $ R.Delete $
objectSnapshotSequenceE objId 0 revId
void $ runQuery $ R.Delete $
objectPatchSequenceE objId 0 revId
objectContent :: (FromJSON a) => ObjectId -> Avers a
objectContent objId = do
Snapshot{..} <- lookupLatestSnapshot objId
parseValue snapshotContent
lookupLatestSnapshot :: ObjectId -> Avers Snapshot
lookupLatestSnapshot objId = measureDuration M_avers_storage_lookupLatestSnapshot_duration $ do
snapshot <- newestSnapshot objId
patches <- patchesAfterRevision objId (snapshotRevisionId snapshot)
applyPatches snapshot patches
applyPatchToSnapshot :: Snapshot -> Patch -> Avers Snapshot
applyPatchToSnapshot snapshot@Snapshot{..} Patch{..} =
case applyOperation snapshotContent patchOperation of
Left e -> patchError e
Right v -> return $ snapshot { snapshotContent = v
, snapshotRevisionId = patchRevisionId
}
applyPatches :: Snapshot -> [Patch] -> Avers Snapshot
applyPatches snapshot patches =
foldM applyPatchToSnapshot snapshot patches
lookupRecentRevision :: ObjectId -> Avers (Maybe RevId)
lookupRecentRevision objId = do
m <- liftIO . atomically . readTVar =<< gets hRecentRevisionCache
return $ M.lookup objId m
updateRecentRevision :: ObjectId -> RevId -> Avers ()
updateRecentRevision objId revId = do
cache <- gets hRecentRevisionCache
liftIO $ atomically $ modifyTVar' cache $
M.insertWith max objId revId
latestSnapshotBetween :: ObjectId -> Int -> Int -> Avers Snapshot
latestSnapshotBetween objId lo hi = do
snapshots <- runQueryCollect $ limitE 1 $
R.OrderByIndexed (R.Descending "objectSnapshotSequence") $
objectSnapshotSequenceE objId lo hi
snapshot <- case snapshots V.!? 0 of
Just x -> parseDatum x
Nothing -> return $ initialSnapshot objId
updateRecentRevision objId (snapshotRevisionId snapshot)
return snapshot
newestSnapshot :: ObjectId -> Avers Snapshot
newestSnapshot objId = measureDuration M_avers_storage_newestSnapshot_duration $ do
(RevId revId) <- fromMaybe zeroRevId <$> lookupRecentRevision objId
latestSnapshotBetween objId revId maxBound
lookupSnapshot :: ObjectId -> RevId -> Avers Snapshot
lookupSnapshot objId (RevId revId) = measureDuration M_avers_storage_lookupSnapshot_duration $ do
snapshot <- latestSnapshotBetween objId 0 revId
patches <- patchesAfterRevision objId (snapshotRevisionId snapshot)
foldM applyPatchToSnapshot snapshot $
filter (\Patch{..} -> unRevId patchRevisionId <= revId) patches
savePatch :: Patch -> Avers ()
savePatch = insertDocument patchesTable
saveSnapshot :: Snapshot -> Avers ()
saveSnapshot = insertDocument snapshotsTable
updateSecret :: SecretId -> Text -> Avers ()
updateSecret secId secret = do
ep <- liftIO $ encryptPassIO defaultParams (Pass $ T.encodeUtf8 secret)
saveSecretValue secId ep
verifySecret :: SecretId -> Text -> Avers ()
verifySecret secId secret = do
Secret{..} <- requireResult (DocumentNotFound $ toPk secId) =<<
lookupDocument secretsTable secId
let ep = EncryptedPass $ T.encodeUtf8 secretValue
let (ok, new) = verifyPass defaultParams (Pass $ T.encodeUtf8 secret) ep
when (not ok) $ strErr $ "Not Found"
maybe (return ()) (saveSecretValue secId) new
saveSecretValue :: SecretId -> EncryptedPass -> Avers ()
saveSecretValue secId (EncryptedPass x) = do
upsertDocument secretsTable $ Secret secId $ T.decodeUtf8 x
objectPatches :: ObjectId -> Avers [Patch]
objectPatches objId = patchesAfterRevision objId (RevId (1))
patchesAfterRevision :: ObjectId -> RevId -> Avers [Patch]
patchesAfterRevision objId (RevId revId) = measureDuration M_avers_storage_patchesAfterRevision_duration $ do
res <- runQueryCollect $
R.OrderBy [R.Ascending "revisionId"] $
objectPatchSequenceE objId (revId + 1) maxBound
V.toList <$> V.mapM parseDatum res
lookupPatch :: ObjectId -> RevId -> Avers Patch
lookupPatch objId revId = measureDuration M_avers_storage_lookupPatch_duration $ do
runQuerySingleSelection $ R.Get patchesTable $ R.lift $
toPk objId <> "@" <> toPk revId
lookupObjectType :: Text -> Avers SomeObjectType
lookupObjectType objType = do
types <- objectTypes <$> gets hConfig
case find (\(SomeObjectType ObjectType{..}) -> otType == objType) types of
Nothing -> throwError $ UnknownObjectType objType
Just x -> return x
applyObjectUpdates
:: ObjectId
-> RevId
-> ObjId
-> [Operation]
-> Bool
-> Avers ([Patch], Int, [Patch])
applyObjectUpdates objId revId committerId ops novalidate = measureDuration M_avers_storage_applyObjectUpdates_duration $ do
obj <- lookupObject baseObjId
SomeObjectType ot <- lookupObjectType (objectType obj)
baseSnapshot <- lookupSnapshot objId revId
previousPatches <- patchesAfterRevision objId revId
reportMeasurement M_avers_storage_applyObjectUpdates_numOperations
(fromIntegral $ length ops)
reportMeasurement M_avers_storage_applyObjectUpdates_numPreviousPatches
(fromIntegral $ length previousPatches)
latestSnapshot <- applyPatches baseSnapshot previousPatches
(Snapshot{..}, PatchState{..}) <- runStateT (patchHandler novalidate) $
PatchState ot objId revId committerId ops 0
baseSnapshot latestSnapshot previousPatches []
unless novalidate $ do
content <- parseValue snapshotContent
updateObjectViews ot baseObjId (Just content)
return (previousPatches, psNumConsumedOperations, psPatches)
where
baseObjId = objectIdBase objId
data PatchState a = PatchState
{ psObjectType :: ObjectType a
, psObjectId :: ObjectId
, psRevisionId :: RevId
, psCommitterId :: ObjId
, psOperations :: [ Operation ]
, psNumConsumedOperations :: Int
, psBaseSnapshot :: Snapshot
, psLatestSnapshot :: Snapshot
, psPreviousPatches :: [ Patch ]
, psPatches :: [ Patch ]
}
type AversPatch a b = StateT (PatchState a) Avers b
patchHandler :: (FromJSON a) => Bool -> AversPatch a Snapshot
patchHandler novalidate = do
PatchState{..} <- get
foldM (saveOperation $ snapshotContent psBaseSnapshot)
psLatestSnapshot psOperations
where
saveOperation baseContent snapshot@Snapshot{..} op = do
PatchState{..} <- get
case rebaseOperation baseContent op psPreviousPatches of
Nothing -> return snapshot
Just op' -> do
now <- liftIO $ getCurrentTime
let revId = succ snapshotRevisionId
patch = Patch psObjectId revId psCommitterId now op'
case applyOperation snapshotContent op' of
Left e -> error $ "Failure: " ++ (show e)
Right newContent
| newContent /= snapshotContent -> do
unless novalidate $ do
lift $ validateWithType psObjectType newContent
let newSnapshot = snapshot { snapshotContent = newContent
, snapshotRevisionId = revId
}
lift $ savePatch patch
modify $ \s -> s
{ psPatches = psPatches ++ [patch]
, psNumConsumedOperations = psNumConsumedOperations + 1
}
lift $ saveSnapshot newSnapshot
return newSnapshot
| otherwise -> return snapshot
existsBlob :: BlobId -> Avers Bool
existsBlob = existsDocument blobsTable
lookupBlob :: BlobId -> Avers Blob
lookupBlob bId = lookupDocument blobsTable bId >>=
requireResult (DocumentNotFound $ toPk bId)
insertBlob :: Blob -> Avers ()
insertBlob = insertDocument blobsTable
saveBlobContent :: Blob -> BL.ByteString -> Avers ()
saveBlobContent Blob{..} content = do
cfg <- gets hConfig
res <- liftIO $ (putBlob cfg) blobId blobContentType content
case res of
Left e -> throwError e
Right _ -> return ()
saveSession :: Session -> Avers ()
saveSession = insertDocument sessionsTable
lookupSession :: SessionId -> Avers Session
lookupSession sessId = lookupDocument sessionsTable sessId >>=
requireResult (DocumentNotFound $ toPk sessId)
dropSession :: SessionId -> Avers ()
dropSession sessId = void $ deleteDocument sessionsTable sessId
newId :: Int -> IO Text
newId n = T.pack <$> take n <$> randomAlphanumericSequence
where
randomAlphanumericSequence :: IO String
randomAlphanumericSequence = map alnum <$>
evalRandIO (sequence $ repeat $ getRandomR (0, 61))
alnum :: Int -> Char
alnum x
| x < 26 = chr ((x ) + 65)
| x < 52 = chr ((x 26) + 97)
| x < 62 = chr ((x 52) + 48)
| otherwise = error $ "Out of range: " ++ show x
validateObject :: Text -> Value -> Avers ()
validateObject objType value = do
(SomeObjectType ot) <- lookupObjectType objType
validateWithType ot value
validateWithType :: (FromJSON a) => ObjectType a -> Value -> Avers ()
validateWithType ot value = case parseValueAs ot value of
Left e -> throwError e
Right _ -> return ()
lookupRelease :: ObjId -> RevId -> Avers Release
lookupRelease objId revId = objectContent (ReleaseObjectId objId revId)
createRelease :: ObjId -> RevId -> Avers ()
createRelease objId revId = do
objectExists <- exists objId
unless objectExists $ throwError (ObjectNotFound objId)
exists' <- existsDocument snapshotsTable (toPk snapshot)
unless exists' $ insertDocument snapshotsTable snapshot
where
objectId = ReleaseObjectId objId revId
snapshot = Snapshot objectId zeroRevId emptyObject
lookupLatestRelease :: ObjId -> Avers (Maybe RevId)
lookupLatestRelease objId = do
let match = R.lift $ "^" <> toPk objId <> "/release/"
predicate :: R.Exp R.Object -> R.Exp Bool
predicate x = R.Coerce
(R.Match (objectFieldE "objectId" x) match)
(R.lift ("bool"::Text))
oId <- runQueryDatum $
(objectFieldE "objectId" :: R.Exp R.Object -> R.Exp R.Datum) $
headE $
R.OrderBy [R.Descending "objectId"] $
R.Filter predicate snapshotsTable
case oId of
ReleaseObjectId objId1 revId -> do
when (objId /= objId1) $ databaseError "lookupLatestRelease: objId do not match"
return $ Just revId
_ -> return Nothing
createBlob :: BL.ByteString -> Text -> Avers Blob
createBlob body contentType = do
ex <- existsBlob (blobId blob)
unless ex $ do
saveBlobContent blob body
insertBlob blob
return blob
where
size = fromIntegral $ BL.length body
hash = convertToBase Base16 $ (hashlazy body :: Digest SHA3_256)
blob = Blob (BlobId $ T.decodeUtf8 hash) size contentType
objectsOfType :: ObjectType a -> Avers (Vector ObjId)
objectsOfType objType = do
let predicate :: R.Exp R.Object -> R.Exp Bool
predicate = objectFieldEqE "type" (otType objType)
res <- runQueryCollect $
R.Map mapId $
R.OrderBy [R.Descending "createdAt", R.Ascending "id"] $
R.Filter isNotDeleted $
R.Filter predicate objectsTable
return $ V.map ObjId res
allObjectsOfType :: ObjectType a -> Avers (Vector ObjId)
allObjectsOfType objType = do
let predicate :: R.Exp R.Object -> R.Exp Bool
predicate = objectFieldEqE "type" (otType objType)
res <- runQueryCollect $
R.Map mapId $
R.OrderBy [R.Descending "createdAt", R.Ascending "id"] $
R.Filter predicate objectsTable
return $ V.map ObjId res
isNotDeleted :: R.Exp R.Object -> R.Exp Bool
isNotDeleted x = R.Any
[ R.Not $ R.HasFields ["deleted"] x
, objectFieldEqE "deleted" False x
]
mapId:: R.Exp R.Object -> R.Exp Text
mapId = R.GetField "id"
indexF :: R.Exp R.Object -> R.Exp (R.Array R.Datum)
indexF obj = R.lift [ R.GetField "objectId" obj, R.GetField "revisionId" obj ]
bootstrap :: Avers ()
bootstrap = do
createTable "objects" []
createTable "sessions" []
createTable "blobs" []
createTable "secrets" []
createTable "patches"
[ SomeIndex $ Index "objectPatchSequence" indexF
]
createTable "snapshots"
[ SomeIndex $ Index "objectSnapshotSequence" indexF
]
types <- objectTypes <$> gets hConfig
forM_ types $ \(SomeObjectType ObjectType{..}) -> do
forM_ otViews $ \(SomeView v@View{..}) -> do
createTable (viewTableName v) viewIndices
return ()
createTable :: Text -> [SomeIndex] -> Avers ()
createTable name indices = do
let table = R.Table Nothing $ R.lift name
pool <- gets hDatabaseHandlePool
db <- liftIO $ withResource pool $ \handle -> pure $ R.handleDatabase handle
tables <- runQuery $ R.ListTables db
when (name `V.notElem` tables) $ do
liftIO $ putStrLn $ "Creating table '" <> T.unpack name <> "'"
void $ runQuery $ R.CreateTable db (R.lift name)
void $ runQuery $ R.WaitTable table
existingIndices <- runQuery $ R.ListIndices table
forM_ indices $ \(SomeIndex Index{..}) -> do
when (indexName `V.notElem` existingIndices) $ do
liftIO $ putStrLn $ "Creating index '" <> T.unpack indexName <> "' on table '" <> T.unpack name <> "'"
void $ runQuery $ R.CreateIndex table (R.lift indexName) indexExpression
void $ runQuery $ R.WaitIndex table [R.lift indexName]
streamPatches :: Pool R.Handle -> TChan Change -> IO ()
streamPatches pool chan = forever $ withResource pool $ \handle -> do
token <- R.start handle $ R.SequenceChanges patchesTable
loop handle token
where
writePatchNotifications :: Vector R.ChangeNotification -> IO ()
writePatchNotifications v = forM_ v $ \cn -> case parseDatum (R.cnNewValue cn) of
Left e -> print e
Right p -> atomically $ writeTChan chan $ CPatch p
loop :: R.Handle -> R.Token -> IO ()
loop handle token = do
res <- R.nextResult handle token :: IO (Either R.Error (R.Sequence R.ChangeNotification))
case res of
Left e -> print e
Right (R.Done r) -> writePatchNotifications r
Right (R.Partial _ r) -> do
writePatchNotifications r
R.continue handle token
loop handle token
changeChannel :: Handle -> IO (TChan Change)
changeChannel h = atomically $ dupTChan (hChanges h)