{-# LANGUAGE TypeSynonymInstances #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Examples.Raft.FileStore.Log where
import Protolude hiding (try)
import Control.Monad.Fail
import Control.Monad.Catch
import Control.Monad.Trans.Class
import qualified Data.ByteString as BS
import Data.Sequence ((><))
import qualified Data.Sequence as Seq
import qualified Data.Serialize as S
import qualified System.AtomicWrite.Writer.ByteString as AW
import Raft.Log
import Raft.Monad
import Raft.StateMachine
import Raft.RPC
import Raft.Client
import Raft.Persistent
import Raft.Types
newtype RaftLogFileStoreError = RaftLogFileStoreError Text
deriving (Show)
newtype RaftLogFile = RaftLogFile { unRaftLogFile :: FilePath }
instance Exception RaftLogFileStoreError
newtype RaftLogFileStoreT m a = RaftLogFileStoreT { unRaftLogFileStoreT :: ReaderT RaftLogFile m a }
deriving (Functor, Applicative, Monad, MonadIO, MonadFail, MonadReader RaftLogFile, Alternative, MonadPlus, MonadTrans)
deriving instance MonadThrow m => MonadThrow (RaftLogFileStoreT m)
deriving instance MonadCatch m => MonadCatch (RaftLogFileStoreT m)
deriving instance MonadMask m => MonadMask (RaftLogFileStoreT m)
runRaftLogFileStoreT :: RaftLogFile -> RaftLogFileStoreT m a -> m a
runRaftLogFileStoreT rlogFile = flip runReaderT rlogFile . unRaftLogFileStoreT
instance (S.Serialize v, MonadIO m) => RaftInitLog (RaftLogFileStoreT m) v where
type RaftInitLogError (RaftLogFileStoreT m) = RaftLogFileStoreError
initializeLog _ = do
RaftLogFile logFile <- ask
eRes <- liftIO $ try (BS.writeFile logFile (S.encode (mempty :: Entries v)))
case eRes of
Left (e :: SomeException) -> pure $ Left (RaftLogFileStoreError (show e))
Right _ -> pure $ Right ()
instance (MonadIO m, S.Serialize v) => RaftWriteLog (RaftLogFileStoreT m) v where
type RaftWriteLogError (RaftLogFileStoreT m) = RaftLogFileStoreError
writeLogEntries newEntries = do
RaftLogFile logFile <- ask
eLogEntries <- readLogEntries
case eLogEntries of
Left err -> panic ("writeLogEntries: " <> err)
Right currEntries -> liftIO $ Right <$> AW.atomicWriteFile logFile (S.encode (currEntries >< newEntries))
instance (MonadIO m, S.Serialize v) => RaftReadLog (RaftLogFileStoreT m) v where
type RaftReadLogError (RaftLogFileStoreT m) = RaftLogFileStoreError
readLogEntry (Index idx) = do
eLogEntries <- readLogEntries
case eLogEntries of
Left err -> panic ("readLogEntry: " <> err)
Right entries ->
case entries Seq.!? fromIntegral (if idx == 0 then 0 else idx - 1) of
Nothing -> pure (Right Nothing)
Just e -> pure (Right (Just e))
readLastLogEntry = do
eLogEntries <- readLogEntries
case eLogEntries of
Left err -> panic ("readLastLogEntry: " <> err)
Right entries -> case entries of
Seq.Empty -> pure (Right Nothing)
(_ Seq.:|> e) -> pure (Right (Just e))
instance (MonadIO m, S.Serialize v) => RaftDeleteLog (RaftLogFileStoreT m) v where
type RaftDeleteLogError (RaftLogFileStoreT m) = RaftLogFileStoreError
deleteLogEntriesFrom idx = do
eLogEntries <- readLogEntries
case eLogEntries of
Left err -> panic ("deleteLogEntriesFrom: " <> err)
Right (entries :: Entries v) -> do
let newLogEntries = Seq.dropWhileR ((>= idx) . entryIndex) entries
RaftLogFile logFile <- ask
liftIO $ AW.atomicWriteFile logFile (S.encode newLogEntries)
pure (Right DeleteSuccess)
readLogEntries :: (MonadIO m, S.Serialize v) => RaftLogFileStoreT m (Either Text (Entries v))
readLogEntries = liftIO . fmap (first toS . S.decode) . BS.readFile . unRaftLogFile =<< ask
instance RaftPersist m => RaftPersist (RaftLogFileStoreT m) where
type RaftPersistError (RaftLogFileStoreT m) = RaftPersistError m
initializePersistentState = lift initializePersistentState
readPersistentState = lift readPersistentState
writePersistentState = lift . writePersistentState
instance (Monad m, RaftSendRPC m v) => RaftSendRPC (RaftLogFileStoreT m) v where
sendRPC nid msg = lift (sendRPC nid msg)
instance (Monad m, RaftRecvRPC m v) => RaftRecvRPC (RaftLogFileStoreT m) v where
type RaftRecvRPCError (RaftLogFileStoreT m) v = RaftRecvRPCError m v
receiveRPC = lift receiveRPC
instance (Monad m, RaftSendClient m sm v) => RaftSendClient (RaftLogFileStoreT m) sm v where
sendClient clientId = lift . sendClient clientId
instance (Monad m, RaftRecvClient m v) => RaftRecvClient (RaftLogFileStoreT m) v where
type RaftRecvClientError (RaftLogFileStoreT m) v = RaftRecvClientError m v
receiveClient = lift receiveClient
instance RaftStateMachine m sm v => RaftStateMachine (RaftLogFileStoreT m) sm v where
validateCmd = lift . validateCmd
askRaftStateMachinePureCtx = lift askRaftStateMachinePureCtx
instance MonadRaftChan v m => MonadRaftChan v (RaftLogFileStoreT m) where
type RaftEventChan v (RaftLogFileStoreT m) = RaftEventChan v m
readRaftChan = lift . readRaftChan
writeRaftChan chan = lift . writeRaftChan chan
newRaftChan = lift (newRaftChan @v @m)
instance (MonadIO m, MonadRaftFork m) => MonadRaftFork (RaftLogFileStoreT m) where
type RaftThreadId (RaftLogFileStoreT m) = RaftThreadId m
raftFork r m = do
raftLogFile <- ask
lift $ raftFork r (runRaftLogFileStoreT raftLogFile m)