{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS -Wall #-}
{-# OPTIONS -Werror #-}
{-# OPTIONS -fno-warn-partial-type-signatures #-}
module DFINITY.RadixTree.Conduit (
sourceRadixTree
, sinkRadixTree
) where
import Codec.Serialise (deserialise, deserialiseOrFail, serialise)
import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.BoundedChan (BoundedChan, readChan, tryWriteChan)
import Control.Concurrent.MVar (MVar, modifyMVar_, newMVar, readMVar)
import Control.Concurrent.ReadWriteLock (RWLock)
import Control.Exception (throw)
import Control.Monad (filterM, foldM, forM_, forever, void, when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Resource (MonadResource, ResourceT, allocate, release)
import Crypto.Hash.BLAKE2.BLAKE2s (hash)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.ByteString.Short (fromShort, toShort)
import Data.Conduit (ConduitT, await, yield)
import Data.HashTable.IO as Cuckoo (CuckooHashTable, delete, fromList, insert, lookup)
import Data.List as List (delete)
import Data.LruCache as LRU (LruCache, empty, insert, lookup)
import Data.Maybe (isJust, isNothing)
import Data.Void (Void)
import Database.LevelDB as LevelDB (DB, Options(..), defaultOptions, defaultWriteOptions, delete)
import Database.LevelDB.Base (open)
import Database.LevelDB.Internal (unsafeClose)
import System.Directory (canonicalizePath, getTemporaryDirectory, removeDirectoryRecursive)
import System.IO.Temp (createTempDirectory)
import DFINITY.RadixTree.Bits
import DFINITY.RadixTree.Lenses
import DFINITY.RadixTree.Lock
import DFINITY.RadixTree.Types
import DFINITY.RadixTree.Utilities
sourceRadixTree
:: forall m database. MonadResource m
=> RadixDatabase (ConduitT () ByteString m) database
=> [Bool]
-> Int
-> BoundedChan RadixRoot
-> RadixTree database
-> RWLock
-> ConduitT () ByteString m ()
sourceRadixTree mask cacheSize chan tree radixLock
| cacheSize <= 0 = throw $ InvalidArgument "invalid LRU cache size"
| otherwise = do
cache <- liftIO $ newMVar $ empty cacheSize
action <- fmap fst $ flip allocate killThread $ forkIO $ forever $ do
root <- readChan chan
modifyMVar_ cache $ pure . LRU.insert root ()
loop cache tree []
release action
where
loop
:: MVar (LruCache RadixRoot ())
-> RadixTree database
-> [RadixRoot]
-> ConduitT () ByteString m ()
loop cache subtree@RadixTree {..} accum = do
let accum' = _radixCheckpoint:accum
seen <- liftIO $ readMVar cache
if flip any accum' $ isJust . flip LRU.lookup seen
then pure ()
else do
let key = fromShort _radixCheckpoint
result <- withReadLock radixLock $ load _radixDatabase key
case result of
Nothing -> pure ()
Just bytes -> do
let RadixNode {..} = deserialise $ fromStrict bytes
let success = all id $ zipWith (==) mask $ toBits $ fromShort _radixCheckpoint
when success $ yield bytes
forM_ [_radixLeft, _radixRight] $ \ case
Nothing -> pure ()
Just root -> loop cache `flip` accum' $ setCheckpoint root subtree
{-# SPECIALISE sourceRadixTree
:: [Bool]
-> Int
-> BoundedChan RadixRoot
-> RadixTree DB
-> RWLock
-> ConduitT () ByteString (ResourceT IO) () #-}
sinkRadixTree
:: forall m database. MonadResource m
=> RadixDatabase (ConduitT ByteString Void m) database
=> RadixRoot
-> BoundedChan RadixRoot
-> RadixTree database
-> RWLock
-> ConduitT ByteString Void m (Either String (RadixTree database))
sinkRadixTree checkpoint chan tree@RadixTree {..} radixLock = do
relative <- liftIO getTemporaryDirectory
absolute <- liftIO $ canonicalizePath relative
let createTempDir = createTempDirectory absolute "dfinity"
let destroyTempDir = ignoreIOErrors . removeDirectoryRecursive
(tempDirKey, tempDir) <- allocate createTempDir destroyTempDir
let createTempDatabase = open tempDir defaultOptions {createIfMissing = True}
(tempDatabaseKey, tempDatabase) <- allocate createTempDatabase unsafeClose
table <- liftIO $ fromList [(checkpoint, Nothing)]
result <- loop1 tempDatabase table
release tempDatabaseKey
release tempDirKey
pure result
where
loop1
:: DB
-> CuckooHashTable RadixRoot (Maybe RadixRoot)
-> ConduitT ByteString Void m (Either String (RadixTree database))
loop1 tempDatabase table = do
done <- liftIO $ isNothing <$> Cuckoo.lookup table checkpoint
if done
then pure $ Right $ setCheckpoint checkpoint $ setRoot checkpoint tree
else do
mval <- await
case mval of
Nothing -> pure $ Left "EOF"
Just node ->
case deserialiseOrFail $ fromStrict node of
Left _ -> loop1 tempDatabase table
Right RadixNode {..} -> do
let key = hash 20 mempty node
let root = toShort key
want <- liftIO $ isJust <$> Cuckoo.lookup table root
exists <- if want
then pure False
else withReadLock radixLock $ isJust <$> load _radixDatabase key
if exists
then do
liftIO $ void $ tryWriteChan chan root
liftIO $ Cuckoo.delete table root
loop1 tempDatabase table
else do
let absent = fmap isNothing . withReadLock radixLock . load _radixDatabase . fromShort
let children = maybe id (:) _radixLeft $ maybe id (:) _radixRight []
targets <- filterM absent children
let value = toStrict $ serialise (node, targets)
store tempDatabase key value
if not want
then loop1 tempDatabase table
else do
eligible <- loop2 tempDatabase table root []
loop3 tempDatabase table eligible
loop1 tempDatabase table
loop2
:: DB
-> CuckooHashTable RadixRoot (Maybe RadixRoot)
-> RadixRoot
-> [(RadixRoot, ByteString)]
-> ConduitT ByteString Void m [(RadixRoot, ByteString)]
loop2 tempDatabase table root eligible = do
result <- load tempDatabase $ fromShort root
case deserialise . fromStrict <$> result of
Nothing -> pure eligible
Just (bytes, targets :: [] _) ->
if null targets
then pure $ (root, bytes):eligible
else do
liftIO $ forM_ targets $ \ child ->
Cuckoo.insert table child $ Just root
foldM step eligible targets
where step = flip $ loop2 tempDatabase table
loop3
:: DB
-> CuckooHashTable RadixRoot (Maybe RadixRoot)
-> [(RadixRoot, ByteString)]
-> ConduitT ByteString Void m ()
loop3 tempDatabase table = \ case
[] -> pure ()
(root, bytes):eligible -> do
let key = fromShort root
withWriteLock radixLock $ store _radixDatabase key bytes
LevelDB.delete tempDatabase defaultWriteOptions $ fromShort root
Just parent <- liftIO $ Cuckoo.lookup table root
liftIO $ Cuckoo.delete table root
case parent of
Nothing -> pure ()
Just root' -> do
let key' = fromShort root'
Just value <- load tempDatabase key'
let (bytes', targets') = deserialise $ fromStrict value
let targets'' = List.delete root targets'
if null targets''
then do
let eligible' = (root', bytes'):eligible
loop3 tempDatabase table eligible'
else do
let value' = toStrict $ serialise (bytes', targets'')
store tempDatabase key' value'
loop3 tempDatabase table eligible
{-# SPECIALISE sinkRadixTree
:: RadixRoot
-> BoundedChan RadixRoot
-> RadixTree DB
-> RWLock
-> ConduitT ByteString
Void
(ResourceT IO)
(Either String (RadixTree DB)) #-}