{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.Franz.Writer (
WriterHandle,
openWriter,
closeWriter,
withWriter,
write,
flush,
getLastSeqNo
) where
import Control.Concurrent
import Control.DeepSeq (NFData(..))
import Control.Exception
import Control.Monad
import qualified Data.ByteString.FastBuilder as BB
import qualified Data.Vector.Storable.Mutable as MV
import Data.Foldable (toList)
import Data.Int
import Data.Word (Word64)
import Data.IORef
import Data.Kind (Type)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
import GHC.IO.Handle.FD (openFileBlocking)
import System.Directory
import System.Endian (toLE64)
import System.FilePath
import System.IO
data WriterHandle (f :: Type -> Type) = WriterHandle
{ WriterHandle f -> Handle
hPayload :: !Handle
, WriterHandle f -> Handle
hOffset :: !Handle
, WriterHandle f -> MVar (Int, Word64)
vOffset :: !(MVar (Int, Word64))
, WriterHandle f -> IOVector Word64
offsetBuf :: !(MV.IOVector Word64)
, WriterHandle f -> IORef Int
offsetPtr :: !(IORef Int)
, WriterHandle f -> Int
indexCount :: !Int
}
instance NFData (WriterHandle f) where
rnf :: WriterHandle f -> ()
rnf WriterHandle{} = ()
getLastSeqNo :: WriterHandle f -> IO Int
getLastSeqNo :: WriterHandle f -> IO Int
getLastSeqNo = ((Int, Word64) -> Int) -> IO (Int, Word64) -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1 (Int -> Int) -> ((Int, Word64) -> Int) -> (Int, Word64) -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int, Word64) -> Int
forall a b. (a, b) -> a
fst) (IO (Int, Word64) -> IO Int)
-> (WriterHandle f -> IO (Int, Word64)) -> WriterHandle f -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Int, Word64) -> IO (Int, Word64)
forall a. MVar a -> IO a
readMVar (MVar (Int, Word64) -> IO (Int, Word64))
-> (WriterHandle f -> MVar (Int, Word64))
-> WriterHandle f
-> IO (Int, Word64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WriterHandle f -> MVar (Int, Word64)
forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
vOffset
openWriter :: Foldable f
=> f String
-> FilePath
-> IO (WriterHandle f)
openWriter :: f String -> String -> IO (WriterHandle f)
openWriter f String
idents String
path = do
Bool -> String -> IO ()
createDirectoryIfMissing Bool
True String
path
let payloadPath :: String
payloadPath = String
path String -> String -> String
</> String
"payloads"
let offsetPath :: String
offsetPath = String
path String -> String -> String
</> String
"offsets"
let indexPath :: String
indexPath = String
path String -> String -> String
</> String
"indices"
Bool
alreadyExists <- String -> IO Bool
doesFileExist String
payloadPath
MVar (Int, Word64)
vOffset <- if Bool
alreadyExists
then do
Integer
count <- String -> IOMode -> (Handle -> IO Integer) -> IO Integer
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
withFile String
offsetPath IOMode
ReadMode Handle -> IO Integer
hFileSize
Integer
size <- String -> IOMode -> (Handle -> IO Integer) -> IO Integer
forall r. String -> IOMode -> (Handle -> IO r) -> IO r
withFile String
payloadPath IOMode
ReadMode Handle -> IO Integer
hFileSize
(Int, Word64) -> IO (MVar (Int, Word64))
forall a. a -> IO (MVar a)
newMVar (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
count Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
8, Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
size)
else (Int, Word64) -> IO (MVar (Int, Word64))
forall a. a -> IO (MVar a)
newMVar (Int
0,Word64
0)
String -> String -> IO ()
writeFile String
indexPath (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ f String -> [String]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList f String
idents
Handle
hPayload <- String -> IOMode -> IO Handle
openFileBlocking String
payloadPath IOMode
AppendMode
Handle
hOffset <- String -> IOMode -> IO Handle
openFileBlocking String
offsetPath IOMode
AppendMode
IOVector Word64
offsetBuf <- Int -> IO (MVector (PrimState IO) Word64)
forall (m :: * -> *) a.
(PrimMonad m, Storable a) =>
Int -> m (MVector (PrimState m) a)
MV.new Int
offsetBufferSize
IORef Int
offsetPtr <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
let indexCount :: Int
indexCount = f String -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length f String
idents Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
WriterHandle f -> IO (WriterHandle f)
forall (m :: * -> *) a. Monad m => a -> m a
return WriterHandle :: forall (f :: * -> *).
Handle
-> Handle
-> MVar (Int, Word64)
-> IOVector Word64
-> IORef Int
-> Int
-> WriterHandle f
WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
hOffset :: Handle
hPayload :: Handle
vOffset :: MVar (Int, Word64)
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
..}
closeWriter :: WriterHandle f -> IO ()
closeWriter :: WriterHandle f -> IO ()
closeWriter h :: WriterHandle f
h@WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
indexCount :: forall (f :: * -> *). WriterHandle f -> Int
offsetPtr :: forall (f :: * -> *). WriterHandle f -> IORef Int
offsetBuf :: forall (f :: * -> *). WriterHandle f -> IOVector Word64
vOffset :: forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
hOffset :: forall (f :: * -> *). WriterHandle f -> Handle
hPayload :: forall (f :: * -> *). WriterHandle f -> Handle
..} = do
WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
flush WriterHandle f
h
Handle -> IO ()
hClose Handle
hPayload
Handle -> IO ()
hClose Handle
hOffset
withWriter :: Foldable f => f String -> FilePath -> (WriterHandle f -> IO a) -> IO a
withWriter :: f String -> String -> (WriterHandle f -> IO a) -> IO a
withWriter f String
idents String
path = IO (WriterHandle f)
-> (WriterHandle f -> IO ()) -> (WriterHandle f -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (f String -> String -> IO (WriterHandle f)
forall (f :: * -> *).
Foldable f =>
f String -> String -> IO (WriterHandle f)
openWriter f String
idents String
path) WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
closeWriter
offsetBufferSize :: Int
offsetBufferSize :: Int
offsetBufferSize = Int
256
write :: Foldable f
=> WriterHandle f
-> f Int64
-> BB.Builder
-> IO Int
write :: WriterHandle f -> f Int64 -> Builder -> IO Int
write h :: WriterHandle f
h@WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
indexCount :: forall (f :: * -> *). WriterHandle f -> Int
offsetPtr :: forall (f :: * -> *). WriterHandle f -> IORef Int
offsetBuf :: forall (f :: * -> *). WriterHandle f -> IOVector Word64
vOffset :: forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
hOffset :: forall (f :: * -> *). WriterHandle f -> Handle
hPayload :: forall (f :: * -> *). WriterHandle f -> Handle
..} f Int64
ixs !Builder
bs = MVar (Int, Word64)
-> ((Int, Word64) -> IO ((Int, Word64), Int)) -> IO Int
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (Int, Word64)
vOffset (((Int, Word64) -> IO ((Int, Word64), Int)) -> IO Int)
-> ((Int, Word64) -> IO ((Int, Word64), Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \(Int
n, Word64
ofs) -> do
Word64
len <- Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> IO Int -> IO Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Builder -> IO Int
BB.hPutBuilderLen Handle
hPayload Builder
bs
let ofs' :: Word64
ofs' = Word64
ofs Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
len
Int
pos <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
offsetPtr
Int
pos' <- if Int
pos Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
indexCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offsetBufferSize
then Int
0 Int -> IO () -> IO Int
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
unsafeFlush WriterHandle f
h
else Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
pos
MVector (PrimState IO) Word64 -> Int -> Word64 -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Storable a) =>
MVector (PrimState m) a -> Int -> a -> m ()
MV.write IOVector Word64
MVector (PrimState IO) Word64
offsetBuf Int
pos' (Word64 -> IO ()) -> Word64 -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64
toLE64 (Word64 -> Word64) -> Word64 -> Word64
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
ofs'
[(Int, Int64)] -> ((Int, Int64) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [Int64] -> [(Int, Int64)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
pos'Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1..] (f Int64 -> [Int64]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList f Int64
ixs))
(((Int, Int64) -> IO ()) -> IO ())
-> ((Int, Int64) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
i, Int64
v) -> MVector (PrimState IO) Word64 -> Int -> Word64 -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Storable a) =>
MVector (PrimState m) a -> Int -> a -> m ()
MV.write IOVector Word64
MVector (PrimState IO) Word64
offsetBuf Int
i (Word64 -> IO ()) -> Word64 -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64
toLE64 (Word64 -> Word64) -> Word64 -> Word64
forall a b. (a -> b) -> a -> b
$ Int64 -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
v
IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
offsetPtr (Int
pos' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
indexCount)
let !n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
((Int, Word64), Int) -> IO ((Int, Word64), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
n', Word64
ofs'), Int
n)
{-# INLINE write #-}
flush :: WriterHandle f -> IO ()
flush :: WriterHandle f -> IO ()
flush WriterHandle f
h = MVar (Int, Word64) -> ((Int, Word64) -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (WriterHandle f -> MVar (Int, Word64)
forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
vOffset WriterHandle f
h) (((Int, Word64) -> IO ()) -> IO ())
-> ((Int, Word64) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> (Int, Word64) -> IO ()
forall a b. a -> b -> a
const (IO () -> (Int, Word64) -> IO ())
-> IO () -> (Int, Word64) -> IO ()
forall a b. (a -> b) -> a -> b
$ WriterHandle f -> IO ()
forall (f :: * -> *). WriterHandle f -> IO ()
unsafeFlush WriterHandle f
h
unsafeFlush :: WriterHandle f -> IO ()
unsafeFlush :: WriterHandle f -> IO ()
unsafeFlush WriterHandle{Int
Handle
IORef Int
MVar (Int, Word64)
IOVector Word64
indexCount :: Int
offsetPtr :: IORef Int
offsetBuf :: IOVector Word64
vOffset :: MVar (Int, Word64)
hOffset :: Handle
hPayload :: Handle
indexCount :: forall (f :: * -> *). WriterHandle f -> Int
offsetPtr :: forall (f :: * -> *). WriterHandle f -> IORef Int
offsetBuf :: forall (f :: * -> *). WriterHandle f -> IOVector Word64
vOffset :: forall (f :: * -> *). WriterHandle f -> MVar (Int, Word64)
hOffset :: forall (f :: * -> *). WriterHandle f -> Handle
hPayload :: forall (f :: * -> *). WriterHandle f -> Handle
..} = do
Int
len <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
offsetPtr
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Handle -> IO ()
hFlush Handle
hPayload
IOVector Word64 -> (Ptr Word64 -> IO ()) -> IO ()
forall a b. Storable a => IOVector a -> (Ptr a -> IO b) -> IO b
MV.unsafeWith IOVector Word64
offsetBuf ((Ptr Word64 -> IO ()) -> IO ()) -> (Ptr Word64 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr Word64
ptr -> Handle -> Ptr Word64 -> Int -> IO ()
forall a. Storable a => Handle -> Ptr a -> Int -> IO ()
hPutElems Handle
hOffset Ptr Word64
ptr Int
len
IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
offsetPtr Int
0
Handle -> IO ()
hFlush Handle
hOffset
hPutElems :: forall a. Storable a => Handle -> Ptr a -> Int -> IO ()
hPutElems :: Handle -> Ptr a -> Int -> IO ()
hPutElems Handle
hdl Ptr a
ptr Int
len = Handle -> Ptr a -> Int -> IO ()
forall a. Handle -> Ptr a -> Int -> IO ()
hPutBuf Handle
hdl Ptr a
ptr (Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
* a -> Int
forall a. Storable a => a -> Int
sizeOf (a
forall a. HasCallStack => a
undefined :: a))
{-# INLINE hPutElems #-}