{-# LANGUAGE BangPatterns, ScopedTypeVariables #-}
module Streamly.External.LMDB
(
Database,
Environment,
Limits(..),
LMDB_Error(..),
MDB_ErrCode(..),
Mode,
ReadWrite,
ReadOnly,
WriteOptions(..),
defaultLimits,
openEnvironment,
isReadOnlyEnvironment,
getDatabase,
gibibyte,
tebibyte,
clearDatabase,
readLMDB,
unsafeReadLMDB,
defaultWriteOptions,
writeLMDB) where
import Control.Concurrent (isCurrentThreadBound)
import Control.Concurrent.Async (asyncBound, wait)
import Control.Exception (Exception, catch, tryJust, mask_, throw)
import Control.Monad (guard, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, packCStringLen)
import Data.ByteString.Unsafe (unsafeUseAsCStringLen)
import Data.Maybe (fromJust)
import Data.Void (Void)
import Foreign (Ptr, free, malloc, nullPtr, peek)
import Foreign.C (Errno (Errno), eNOTDIR)
import Foreign.C.String (CStringLen)
import Streamly.Internal.Data.Fold (Fold (Fold))
import Streamly.Internal.Data.Stream.StreamD (newFinalizedIORef, runIORefFinalizer)
import Streamly.Internal.Data.Stream.StreamD.Type (Step (Stop, Yield))
import Streamly.Internal.Data.Unfold (supply)
import Streamly.Internal.Data.Unfold.Types (Unfold (Unfold))
import Streamly.External.LMDB.Internal
import Streamly.External.LMDB.Internal.Foreign
newtype Environment mode = Environment (Ptr MDB_env)
isReadOnlyEnvironment :: Mode mode => Environment mode -> Bool
isReadOnlyEnvironment :: Environment mode -> Bool
isReadOnlyEnvironment = mode -> Bool
forall a. Mode a => a -> Bool
isReadOnlyMode (mode -> Bool)
-> (Environment mode -> mode) -> Environment mode -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Environment mode -> mode
forall mode. Environment mode -> mode
mode
where
mode :: Environment mode -> mode
mode :: Environment mode -> mode
mode = Environment mode -> mode
forall a. HasCallStack => a
undefined
data Limits = Limits
{ Limits -> Int
mapSize :: !Int
, Limits -> Int
maxDatabases :: !Int
, Limits -> Int
maxReaders :: !Int
}
defaultLimits :: Limits
defaultLimits :: Limits
defaultLimits = $WLimits :: Int -> Int -> Int -> Limits
Limits
{ mapSize :: Int
mapSize = 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024
, maxDatabases :: Int
maxDatabases = 0
, maxReaders :: Int
maxReaders = 126
}
gibibyte :: Int
gibibyte :: Int
gibibyte = 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024
tebibyte :: Int
tebibyte :: Int
tebibyte = 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024
openEnvironment :: Mode mode => FilePath -> Limits -> IO (Environment mode)
openEnvironment :: FilePath -> Limits -> IO (Environment mode)
openEnvironment path :: FilePath
path limits :: Limits
limits = do
Ptr MDB_env
penv <- IO (Ptr MDB_env)
mdb_env_create
Ptr MDB_env -> Int -> IO ()
mdb_env_set_mapsize Ptr MDB_env
penv (Limits -> Int
mapSize Limits
limits)
let maxDbs :: Int
maxDbs = Limits -> Int
maxDatabases Limits
limits in Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxDbs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= 0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxdbs Ptr MDB_env
penv Int
maxDbs
Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxreaders Ptr MDB_env
penv (Limits -> Int
maxReaders Limits
limits)
let env :: Environment mode
env = Ptr MDB_env -> Environment mode
forall mode. Ptr MDB_env -> Environment mode
Environment Ptr MDB_env
penv :: Mode mode => Environment mode
flags :: [CUInt]
flags = CUInt
mdb_notls CUInt -> [CUInt] -> [CUInt]
forall a. a -> [a] -> [a]
: [CUInt
mdb_rdonly | Environment mode -> Bool
forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env]
let isNotDirectoryError :: LMDB_Error -> Bool
isNotDirectoryError :: LMDB_Error -> Bool
isNotDirectoryError LMDB_Error { e_code :: LMDB_Error -> Either Int MDB_ErrCode
e_code = Left code :: Int
code }
| CInt -> Errno
Errno (Int -> CInt
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
code) Errno -> Errno -> Bool
forall a. Eq a => a -> a -> Bool
== Errno
eNOTDIR = Bool
True
isNotDirectoryError _ = Bool
False
Either () ()
r <- (LMDB_Error -> Maybe ()) -> IO () -> IO (Either () ())
forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> IO (Either b a)
tryJust (Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> Maybe ())
-> (LMDB_Error -> Bool) -> LMDB_Error -> Maybe ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LMDB_Error -> Bool
isNotDirectoryError) (IO () -> IO (Either () ())) -> IO () -> IO (Either () ())
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions [CUInt]
flags)
case Either () ()
r of
Left _ -> Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$ CUInt
mdb_nosubdir CUInt -> [CUInt] -> [CUInt]
forall a. a -> [a] -> [a]
: [CUInt]
flags)
Right _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Environment mode -> IO (Environment mode)
forall (m :: * -> *) a. Monad m => a -> m a
return Environment mode
env
getDatabase :: (Mode mode) => Environment mode -> Maybe String -> IO (Database mode)
getDatabase :: Environment mode -> Maybe FilePath -> IO (Database mode)
getDatabase env :: Environment mode
env@(Environment penv :: Ptr MDB_env
penv) name :: Maybe FilePath
name = do
Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr ([CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_rdonly | Environment mode -> Bool
forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
MDB_dbi_t
dbi <- Ptr MDB_txn -> Maybe FilePath -> CUInt -> IO MDB_dbi_t
mdb_dbi_open Ptr MDB_txn
ptxn Maybe FilePath
name ([CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_create | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Environment mode -> Bool
forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
Database mode -> IO (Database mode)
forall (m :: * -> *) a. Monad m => a -> m a
return (Database mode -> IO (Database mode))
-> Database mode -> IO (Database mode)
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> MDB_dbi_t -> Database mode
forall mode. Ptr MDB_env -> MDB_dbi_t -> Database mode
Database Ptr MDB_env
penv MDB_dbi_t
dbi
clearDatabase :: (Mode mode) => Database mode -> IO ()
clearDatabase :: Database mode -> IO ()
clearDatabase (Database penv :: Ptr MDB_env
penv dbi :: MDB_dbi_t
dbi) = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
asyncBound (do
Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr 0
Ptr MDB_txn -> MDB_dbi_t -> IO ()
mdb_clear Ptr MDB_txn
ptxn MDB_dbi_t
dbi
Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn) IO (Async ()) -> (Async () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Async () -> IO ()
forall a. Async a -> IO a
wait
{-# INLINE readLMDB #-}
readLMDB :: (MonadIO m, Mode mode) => Database mode -> Unfold m Void (ByteString, ByteString)
readLMDB :: Database mode -> Unfold m Void (ByteString, ByteString)
readLMDB db :: Database mode
db = Database mode
-> (CStringLen -> IO ByteString)
-> (CStringLen -> IO ByteString)
-> Unfold m Void (ByteString, ByteString)
forall (m :: * -> *) mode k v.
(MonadIO m, Mode mode) =>
Database mode
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB Database mode
db CStringLen -> IO ByteString
packCStringLen CStringLen -> IO ByteString
packCStringLen
{-# INLINE unsafeReadLMDB #-}
unsafeReadLMDB :: (MonadIO m, Mode mode)
=> Database mode -> (CStringLen -> IO k) -> (CStringLen -> IO v) -> Unfold m Void (k, v)
unsafeReadLMDB :: Database mode
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB (Database penv :: Ptr MDB_env
penv dbi :: MDB_dbi_t
dbi) kmap :: CStringLen -> IO k
kmap vmap :: CStringLen -> IO v
vmap =
(Unfold m MDB_dbi_t (k, v) -> MDB_dbi_t -> Unfold m Void (k, v))
-> MDB_dbi_t -> Unfold m MDB_dbi_t (k, v) -> Unfold m Void (k, v)
forall a b c. (a -> b -> c) -> b -> a -> c
flip Unfold m MDB_dbi_t (k, v) -> MDB_dbi_t -> Unfold m Void (k, v)
forall (m :: * -> *) a b. Unfold m a b -> a -> Unfold m Void b
supply MDB_dbi_t
mdb_first (Unfold m MDB_dbi_t (k, v) -> Unfold m Void (k, v))
-> Unfold m MDB_dbi_t (k, v) -> Unfold m Void (k, v)
forall a b. (a -> b) -> a -> b
$ ((MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
-> m (Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)))
-> (MDB_dbi_t
-> m (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ()))))
-> Unfold m MDB_dbi_t (k, v)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold
(\(op :: MDB_dbi_t
op, pcurs :: Ptr MDB_cursor
pcurs, pk :: Ptr MDB_val
pk, pv :: Ptr MDB_val
pv, ref :: IORef (Maybe (IO ()))
ref) -> do
Bool
found <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
op IO CInt -> (CInt -> IO Bool) -> IO Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \rc :: CInt
rc ->
if CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
/= 0 Bool -> Bool -> Bool
&& CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound then do
IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref
FilePath -> CInt -> IO Bool
forall noReturn. FilePath -> CInt -> IO noReturn
throwLMDBErrNum "mdb_cursor_get" CInt
rc
else
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound
if Bool
found then do
!k
k <- IO k -> m k
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO k -> m k) -> IO k -> m k
forall a b. (a -> b) -> a -> b
$ (\x :: MDB_val
x -> CStringLen -> IO k
kmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSize -> Int) -> CSize -> Int
forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) (MDB_val -> IO k) -> IO MDB_val -> IO k
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ptr MDB_val -> IO MDB_val
forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
!v
v <- IO v -> m v
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO v -> m v) -> IO v -> m v
forall a b. (a -> b) -> a -> b
$ (\x :: MDB_val
x -> CStringLen -> IO v
vmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSize -> Int) -> CSize -> Int
forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) (MDB_val -> IO v) -> IO MDB_val -> IO v
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ptr MDB_val -> IO MDB_val
forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pv
Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)
-> m (Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)
-> m (Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)))
-> Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)
-> m (Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v))
forall a b. (a -> b) -> a -> b
$ (k, v)
-> (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
-> Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)
forall s a. a -> s -> Step s a
Yield (k
k, v
v) (MDB_dbi_t
mdb_next, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IORef (Maybe (IO ()))
ref)
else do
IORef (Maybe (IO ())) -> m ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref
Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)
-> m (Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v))
forall (m :: * -> *) a. Monad m => a -> m a
return Step
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
(k, v)
forall s a. Step s a
Stop)
(\op :: MDB_dbi_t
op -> do
(pcurs :: Ptr MDB_cursor
pcurs, pk :: Ptr MDB_val
pk, pv :: Ptr MDB_val
pv, ref :: IORef (Maybe (IO ()))
ref) <- IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> m (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> m (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ()))))
-> IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> m (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
forall a. IO a -> IO a
mask_ (IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ()))))
-> IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ do
Ptr MDB_txn
ptxn <- IO (Ptr MDB_txn) -> IO (Ptr MDB_txn)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr MDB_txn) -> IO (Ptr MDB_txn))
-> IO (Ptr MDB_txn) -> IO (Ptr MDB_txn)
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr CUInt
mdb_rdonly
Ptr MDB_cursor
pcurs <- IO (Ptr MDB_cursor) -> IO (Ptr MDB_cursor)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr MDB_cursor) -> IO (Ptr MDB_cursor))
-> IO (Ptr MDB_cursor) -> IO (Ptr MDB_cursor)
forall a b. (a -> b) -> a -> b
$ Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
mdb_cursor_open Ptr MDB_txn
ptxn MDB_dbi_t
dbi
Ptr MDB_val
pk <- IO (Ptr MDB_val) -> IO (Ptr MDB_val)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Ptr MDB_val)
forall a. Storable a => IO (Ptr a)
malloc
Ptr MDB_val
pv <- IO (Ptr MDB_val) -> IO (Ptr MDB_val)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Ptr MDB_val)
forall a. Storable a => IO (Ptr a)
malloc
IORef (Maybe (IO ()))
ref <- IO (IORef (Maybe (IO ()))) -> IO (IORef (Maybe (IO ())))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (IO ()))) -> IO (IORef (Maybe (IO ()))))
-> (IO () -> IO (IORef (Maybe (IO ()))))
-> IO ()
-> IO (IORef (Maybe (IO ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO (IORef (Maybe (IO ())))
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
m a -> m (IORef (Maybe (IO ())))
newFinalizedIORef (IO () -> IO (IORef (Maybe (IO ()))))
-> IO () -> IO (IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ do
Ptr MDB_val -> IO ()
forall a. Ptr a -> IO ()
free Ptr MDB_val
pv IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr MDB_val -> IO ()
forall a. Ptr a -> IO ()
free Ptr MDB_val
pk
Ptr MDB_cursor -> IO ()
c_mdb_cursor_close Ptr MDB_cursor
pcurs
Ptr MDB_txn -> IO ()
c_mdb_txn_abort Ptr MDB_txn
ptxn
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> IO
(Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IORef (Maybe (IO ()))
ref)
(MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
-> m (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return (MDB_dbi_t
op, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IORef (Maybe (IO ()))
ref))
data WriteOptions = WriteOptions
{ WriteOptions -> Int
writeTransactionSize :: !Int
, WriteOptions -> Bool
noOverwrite :: !Bool
, WriteOptions -> Bool
writeAppend :: !Bool }
defaultWriteOptions :: WriteOptions
defaultWriteOptions :: WriteOptions
defaultWriteOptions = $WWriteOptions :: Int -> Bool -> Bool -> WriteOptions
WriteOptions
{ writeTransactionSize :: Int
writeTransactionSize = 1
, noOverwrite :: Bool
noOverwrite = Bool
False
, writeAppend :: Bool
writeAppend = Bool
False }
newtype ExceptionString = ExceptionString String deriving (Int -> ExceptionString -> ShowS
[ExceptionString] -> ShowS
ExceptionString -> FilePath
(Int -> ExceptionString -> ShowS)
-> (ExceptionString -> FilePath)
-> ([ExceptionString] -> ShowS)
-> Show ExceptionString
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ExceptionString] -> ShowS
$cshowList :: [ExceptionString] -> ShowS
show :: ExceptionString -> FilePath
$cshow :: ExceptionString -> FilePath
showsPrec :: Int -> ExceptionString -> ShowS
$cshowsPrec :: Int -> ExceptionString -> ShowS
Show)
instance Exception ExceptionString
{-# INLINE writeLMDB #-}
writeLMDB :: (MonadIO m) => Database ReadWrite -> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB :: Database ReadWrite
-> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB (Database penv :: Ptr MDB_env
penv dbi :: MDB_dbi_t
dbi) options :: WriteOptions
options =
let txnSize :: Int
txnSize = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max 1 (WriteOptions -> Int
writeTransactionSize WriteOptions
options)
flags :: CUInt
flags = [CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_nooverwrite | WriteOptions -> Bool
noOverwrite WriteOptions
options] [CUInt] -> [CUInt] -> [CUInt]
forall a. [a] -> [a] -> [a]
++ [CUInt
mdb_append | WriteOptions -> Bool
writeAppend WriteOptions
options]
in ((Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> (ByteString, ByteString)
-> m (Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))))
-> m (Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> ((Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))) -> m ())
-> Fold m (ByteString, ByteString) ()
forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold (\(currChunkSz :: Int
currChunkSz, mtxn :: Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn) (k :: ByteString
k, v :: ByteString
v) -> do
Int
currChunkSz' <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$
if Int
currChunkSz Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
txnSize then do
let (_, ref :: IORef (Maybe (IO ()))
ref) = Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
-> (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn
IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref
Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return 0
else
Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
currChunkSz
(ptxn :: Ptr MDB_txn
ptxn, ref :: IORef (Maybe (IO ()))
ref) <-
if Int
currChunkSz' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0 then
IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. IO a -> IO a
mask_ (IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ do
Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr 0
IORef (Maybe (IO ()))
ref <- IO () -> IO (IORef (Maybe (IO ())))
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
m a -> m (IORef (Maybe (IO ())))
newFinalizedIORef (IO () -> IO (IORef (Maybe (IO ()))))
-> IO () -> IO (IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
(Ptr MDB_txn, IORef (Maybe (IO ())))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, IORef (Maybe (IO ()))
ref)
else
(Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return ((Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
-> (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ByteString -> (CStringLen -> IO ()) -> IO ()
forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
k ((CStringLen -> IO ()) -> IO ()) -> (CStringLen -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(kp :: Ptr CChar
kp, kl :: Int
kl) -> ByteString -> (CStringLen -> IO ()) -> IO ()
forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
v ((CStringLen -> IO ()) -> IO ()) -> (CStringLen -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(vp :: Ptr CChar
vp, vl :: Int
vl) ->
IO () -> (LMDB_Error -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (Ptr MDB_txn
-> MDB_dbi_t
-> Ptr CChar
-> CSize
-> Ptr CChar
-> CSize
-> CUInt
-> IO ()
mdb_put_ Ptr MDB_txn
ptxn MDB_dbi_t
dbi Ptr CChar
kp (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
vp (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
vl) CUInt
flags)
(\(LMDB_Error
e :: LMDB_Error) -> IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> LMDB_Error -> IO ()
forall a e. Exception e => e -> a
throw LMDB_Error
e)
(Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> m (Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
currChunkSz' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1, (Ptr MDB_txn, IORef (Maybe (IO ())))
-> Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. a -> Maybe a
Just (Ptr MDB_txn
ptxn, IORef (Maybe (IO ()))
ref)))
(do
Bool
isBound <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Bool
isCurrentThreadBound
if Bool
isBound then
(Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> m (Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall (m :: * -> *) a. Monad m => a -> m a
return (0, Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. Maybe a
Nothing)
else
ExceptionString
-> m (Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall a e. Exception e => e -> a
throw (ExceptionString
-> m (Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))))
-> ExceptionString
-> m (Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall a b. (a -> b) -> a -> b
$ FilePath -> ExceptionString
ExceptionString "Error: writeLMDB should be executed on a bound thread")
(\(_, mtxn :: Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn) -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
case Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn of
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (_, ref :: IORef (Maybe (IO ()))
ref) -> IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref)