module Database.Arena
(
ArenaLocation(..)
, ArenaT
, Arena
, ArenaDB
, runArenaT
, runArena
, startArena, initArena
, addData
, accessData
) where
import Control.Applicative
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Trans
import Data.Bytes.Get
import Data.Bytes.Put
import Data.Bytes.Serial
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Digest.CRC32
import Data.Either
import Data.Foldable
import Data.IORef
import Data.List
import Data.Maybe
import Data.Semigroup
import qualified Data.Set as Set
import Data.String
import Data.Typeable (cast)
import qualified Data.Vector.Persistent as PV
import Data.Word
import GHC.IO.Device
import GHC.IO.Exception
import qualified GHC.IO.FD as FD
import GHC.IO.Handle
import qualified GHC.IO.Handle.FD as FD
import GHC.IO.Handle.Internals
import GHC.IO.Handle.Types
import Safe
import System.Directory
import System.FilePath
import System.IO
import System.IO.Error
import System.Posix.IO (handleToFd)
import System.Posix.Types (Fd (Fd))
import System.Posix.Unistd (fileSynchroniseDataOnly)
newtype ArenaLocation = ArenaLocation { getArenaLocation :: FilePath }
deriving (Eq, Ord, Read, Show, IsString)
type ArenaID = Word32
type JournalHash = Word32
data JournalFrame =
JF {
jLength :: Word32
, jHash :: JournalHash
, jData :: BS.ByteString
}
mkJournal :: Serial a => a -> JournalFrame
mkJournal a = JF (fromIntegral . BS.length $ d) (crc32 d) d
where d = runPutS . serialize $ a
instance Serial JournalFrame where
serialize (JF l h d) = serialize l *> serialize h *> putByteString d
deserialize = do
l <- deserialize
h <- deserialize
d <- getByteString . fromIntegral $ l
if h == crc32 d
then return $ JF l h d
else fail "Journal Frame failed CRC check---Also, uh, this method shouldn't be in Monad"
data OpenJournal a b =
OJ {
ojArenaID :: ArenaID
, ojHandle :: Handle
, ojSummary :: Option b
, ojVals :: PV.Vector a
}
readDataFile :: (MonadIO m, Serial a) => ArenaLocation -> ArenaID -> m [a]
readDataFile l ai = liftIO $ do
d <- BSL.readFile (dataFile l ai)
return $ runGetL (many deserialize) d
syncHandle :: Handle -> IO ()
syncHandle h = do
hFlush h
unsafeSyncHandle h
where
unsafeSyncHandle h@(DuplexHandle {}) =
ioError (ioeSetErrorString (mkIOError IllegalOperation
"unsafeSyncHandle" (Just h) Nothing)
"unsafeSyncHandle only works on file descriptors")
unsafeSyncHandle h@(FileHandle _ m) = do
withMVar m $ \h_@Handle__{haType=_,..} -> do
case cast haDevice of
Nothing -> ioError (ioeSetErrorString (mkIOError IllegalOperation
"unsafeSyncHandle" (Just h) Nothing)
("handle is not a file descriptor"))
Just fd -> do
flushWriteBuffer h_
fileSynchroniseDataOnly . Fd . FD.fdFD $ fd
withFileSync :: FilePath -> (Handle -> IO r) -> IO r
withFileSync fp f = liftIO $ withFile fp WriteMode go
where go h = f h <* syncHandle h
data ArenaDB summary finalized d = ArenaDB {
adSummarize :: d -> summary
, adFinalize :: summary -> finalized
, adArenaFull :: summary -> Bool
, adArenaLocation :: ArenaLocation
, adDataRef :: IORef [(finalized, IO [d])]
, adCurrentJournal :: MVar (OpenJournal d summary)
}
newtype ArenaT s f d m a = ArenaT { unArenaT :: ReaderT (ArenaDB s f d) m a }
deriving (Functor, Applicative, Monad, MonadTrans, MonadReader (ArenaDB s f d), MonadIO)
runArenaT :: ArenaDB s f d -> ArenaT s f d m a -> m a
runArenaT ad = flip runReaderT ad . unArenaT
type Arena s f d a = ArenaT s f d IO a
runArena :: ArenaDB s f d -> Arena s f d a -> IO a
runArena = runArenaT
summarize :: Monad m => d -> ArenaT s f d m s
summarize d = asks adSummarize <*> pure d
finalize :: Monad m => s -> ArenaT s f d m f
finalize s = asks adFinalize <*> pure s
journalDir, dataDir, tempJournal :: ArenaLocation -> FilePath
journalDir (ArenaLocation fp) = fp </> "journal"
dataDir (ArenaLocation fp) = fp </> "data"
tempJournal al = journalDir al </> "temp"
journalFile, dataFile, dataSummaryFile :: ArenaLocation -> ArenaID -> FilePath
journalFile al i = journalDir al </> show i
dataFile al i = dataDir al </> addExtension (show i) "data"
dataSummaryFile al i = dataDir al </> addExtension (show i) "header"
journalDir', dataDir', tempJournal' :: Monad m => ArenaT s f d m FilePath
journalDir' = asks (journalDir . adArenaLocation)
dataDir' = asks (dataDir . adArenaLocation)
tempJournal' = asks (tempJournal . adArenaLocation)
journalFile', dataFile', dataSummaryFile' :: Monad m => ArenaID -> ArenaT s f d m FilePath
journalFile' ai = asks (journalFile . adArenaLocation) <*> pure ai
dataFile' ai = asks (dataFile . adArenaLocation) <*> pure ai
dataSummaryFile' ai = asks (dataSummaryFile . adArenaLocation) <*> pure ai
data TheFiles = TheFiles {
theJournalDir
, theDataDir
, theTempJournal
, theJournalFile
, theDataFile
, theDataSummaryFile :: FilePath
} deriving (Eq, Ord, Read, Show)
theFiles :: Monad m => Maybe ArenaID -> ArenaT s f d m TheFiles
theFiles Nothing = theFiles (Just $ 1)
theFiles (Just ai) =
TheFiles <$> journalDir' <*> dataDir' <*> tempJournal'
<*> journalFile' ai <*> dataFile' ai <*> dataSummaryFile' ai
initArena :: ArenaLocation -> IO ()
initArena al = do
createDirectoryIfMissing True . journalDir $ al
createDirectoryIfMissing True . dataDir $ al
startArena
:: (Serial d, Serial f, Semigroup s)
=> (d -> s) -> (s -> f) -> (s -> Bool) -> ArenaLocation -> IO (ArenaDB s f d)
startArena adSummarize adFinalize adArenaFull adArenaLocation = do
initArena adArenaLocation
runArena fakeConf $ do
adCurrentJournal <- internArenas >>= cleanJournal >>= liftIO . newMVar
adDataRef <- readAllData >>= liftIO . newIORef
return ArenaDB {..}
where fakeConf = ArenaDB { adCurrentJournal = error "uninitialized current journal"
, adDataRef = error "uninitialized data ref"
, .. }
readJournalFile :: (Serial d, MonadIO m) => ArenaID -> ArenaT s f d m (PV.Vector d)
readJournalFile ai = do
d <- journalFile' ai >>= liftIO . BSL.readFile
return . PV.fromList . map (head . rights . pure . runGetS deserialize . jData) . runGetL (many deserialize) $ d
cleanJournal :: (MonadIO m, Serial d, Semigroup s) => ArenaID -> ArenaT s f d m (OpenJournal d s)
cleanJournal ai = do
TheFiles {..} <- theFiles (Just ai)
liftIO $ doesFileExist theTempJournal >>= (`when` removeFile theTempJournal)
je <- liftIO $ doesFileExist theJournalFile
case je of
False -> do
jh <- liftIO $ openFile theJournalFile WriteMode
return $ OJ ai jh (Option Nothing) mempty
True -> do
as <- readJournalFile ai
liftIO $ withFileSync theTempJournal $ \h ->
mapM_ (BS.hPutStr h . runPutS . serialize . mkJournal) as
liftIO $ renameFile theTempJournal theJournalFile
jh <- liftIO $ openFile theJournalFile AppendMode
as' <- if null as
then return $ Option Nothing
else Option . Just <$> regenerateSummary as
return $ OJ ai jh as' as
readArenaIDs :: MonadIO m => FilePath -> m [ArenaID]
readArenaIDs dir = liftIO go
where go = mapMaybe readMay <$> getDirectoryContents dir
internArenas :: (MonadIO m, Serial d, Serial f, Semigroup s) => ArenaT s f d m ArenaID
internArenas = do
js <- journalDir' >>= readArenaIDs
if null js
then return 0
else do
let latest = maximum js
old = delete latest js
mapM_ internArenaFile old
return latest
regenerateSummary :: (Semigroup s, Monad m) => PV.Vector d -> ArenaT s f d m s
regenerateSummary ds = foldl1 (<>) <$> traverse summarize ds
internArenaFile :: (MonadIO m, Serial f, Serial d, Semigroup s) => ArenaID -> ArenaT s f d m ()
internArenaFile ai = do
TheFiles {..} <- theFiles (Just ai)
as <- readJournalFile ai
fas <- regenerateSummary as >>= finalize
liftIO $ do withFileSync theDataFile $ \h -> mapM_ (BS.hPutStr h . runPutS . serialize) as
withFileSync theDataSummaryFile $ \h -> BS.hPutStr h . runPutS . serialize $ fas
removeFile theJournalFile
readDataFile' :: (MonadIO m, Serial d) => ArenaID -> ArenaT s f d m (IO [d])
readDataFile' ai = asks (readDataFile . adArenaLocation) <*> pure ai
readAllData :: (MonadIO m, Serial f, Serial d) => ArenaT s f d m [(f, IO [d])]
readAllData = do
TheFiles {..} <- theFiles Nothing
ds <- Set.fromList . mapMaybe (readMay . dropExtension) <$> liftIO (getDirectoryContents theDataDir)
forM (Set.toList ds) $ \d -> do
dsf <- dataSummaryFile' d
c <- runGetL deserialize <$> liftIO (BSL.readFile dsf)
rdf <- readDataFile' d
return (c, rdf)
accessData :: MonadIO m => ArenaT s f d m [(f, IO [d])]
accessData = do
ArenaDB {..} <- ask
liftIO . withMVar adCurrentJournal $ \(OJ _ _ s as) -> do
d <- readIORef adDataRef
return $ if null as
then d
else (adFinalize . fromJust . getOption $ s, return . toList $ as) : d
addData :: (MonadIO m, Serial d, Serial f, Semigroup s) => d -> ArenaT s f d m ()
addData d = do
conf@ArenaDB {..} <- ask
liftIO . modifyMVar_ adCurrentJournal $ \(OJ ai h s ds) -> do
BS.hPutStr h . runPutS . serialize . mkJournal $ d
let s' = s <> (pure . adSummarize $ d)
syncHandle h
case adArenaFull . fromJust . getOption $ s' of
False -> return $ OJ ai h s' (ds `PV.snoc` d)
True -> do
hClose h
let (ArenaT rdr) = internArenaFile ai
runReaderT rdr conf
atomicModifyIORef' adDataRef $ \ods ->
((adFinalize . fromJust . getOption $ s', readDataFile adArenaLocation ai):ods, ())
let ai' = ai + 1
h' <- openFile (journalFile adArenaLocation ai') WriteMode
return $ OJ ai' h' mempty mempty