{-# LANGUAGE DeriveDataTypeable, BangPatterns, CPP #-}
module Data.Acid.Local
( openLocalState
, openLocalStateFrom
, prepareLocalState
, prepareLocalStateFrom
, scheduleLocalUpdate'
, scheduleLocalColdUpdate'
, createCheckpointAndClose
, LocalState(..)
, Checkpoint(..)
) where
import Data.Acid.Log as Log
import Data.Acid.Core
import Data.Acid.Common
import Data.Acid.Abstract
import Control.Concurrent ( newEmptyMVar, putMVar, takeMVar, MVar )
import Control.Exception ( onException, evaluate, Exception, throwIO )
import Control.Monad.State ( runState )
import Control.Monad ( join )
import Control.Applicative ( (<$>), (<*>) )
import Data.ByteString.Lazy ( ByteString )
import qualified Data.ByteString.Lazy as Lazy ( length )
import Data.Serialize ( runPutLazy, runGetLazy )
import Data.SafeCopy ( SafeCopy(..), safeGet, safePut
, primitive, contain )
import Data.Typeable ( Typeable, typeOf )
import Data.IORef
import System.FilePath ( (</>), takeDirectory )
import System.FileLock
import System.Directory ( createDirectoryIfMissing )
data LocalState st
= LocalState { localCore :: Core st
, localCopy :: IORef st
, localEvents :: FileLog (Tagged ByteString)
, localCheckpoints :: FileLog Checkpoint
, localLock :: FileLock
} deriving (Typeable)
newtype StateIsLocked = StateIsLocked FilePath deriving (Show, Typeable)
instance Exception StateIsLocked
scheduleLocalUpdate :: UpdateEvent event => LocalState (EventState event) -> event -> IO (MVar (EventResult event))
scheduleLocalUpdate acidState event
= do mvar <- newEmptyMVar
let encoded = runPutLazy (safePut event)
evaluate (Lazy.length encoded)
modifyCoreState_ (localCore acidState) $ \st ->
do let !(result, !st') = runState hotMethod st
pushEntry (localEvents acidState) (methodTag event, encoded) $ do writeIORef (localCopy acidState) st'
putMVar mvar result
return st'
return mvar
where hotMethod = lookupHotMethod (coreMethods (localCore acidState)) event
scheduleLocalUpdate' :: UpdateEvent event => LocalState (EventState event) -> event -> MVar (EventResult event) -> IO (IO ())
scheduleLocalUpdate' acidState event mvar
= do
let encoded = runPutLazy (safePut event)
evaluate (Lazy.length encoded)
act <- modifyCoreState (localCore acidState) $ \st ->
do let !(result, !st') = runState hotMethod st
pushEntry (localEvents acidState) (methodTag event, encoded) $ return ()
let action = do writeIORef (localCopy acidState) st'
putMVar mvar result
return (st', action)
return act
where hotMethod = lookupHotMethod (coreMethods (localCore acidState)) event
scheduleLocalColdUpdate :: LocalState st -> Tagged ByteString -> IO (MVar ByteString)
scheduleLocalColdUpdate acidState event
= do mvar <- newEmptyMVar
modifyCoreState_ (localCore acidState) $ \st ->
do let !(result, !st') = runState coldMethod st
pushEntry (localEvents acidState) event $ do writeIORef (localCopy acidState) st'
putMVar mvar result
return st'
return mvar
where coldMethod = lookupColdMethod (localCore acidState) event
scheduleLocalColdUpdate' :: LocalState st -> Tagged ByteString -> MVar ByteString -> IO (IO ())
scheduleLocalColdUpdate' acidState event mvar
= do act <- modifyCoreState (localCore acidState) $ \st ->
do let !(result, !st') = runState coldMethod st
pushEntry (localEvents acidState) event $ return ()
let action = do writeIORef (localCopy acidState) st'
putMVar mvar result
return (st', action)
return act
where coldMethod = lookupColdMethod (localCore acidState) event
localQuery :: QueryEvent event => LocalState (EventState event) -> event -> IO (EventResult event)
localQuery acidState event
= do st <- readIORef (localCopy acidState)
let (result, _st) = runState hotMethod st
return result
where hotMethod = lookupHotMethod (coreMethods (localCore acidState)) event
localQueryCold :: LocalState st -> Tagged ByteString -> IO ByteString
localQueryCold acidState event
= do st <- readIORef (localCopy acidState)
let (result, _st) = runState coldMethod st
return result
where coldMethod = lookupColdMethod (localCore acidState) event
createLocalCheckpoint :: SafeCopy st => LocalState st -> IO ()
createLocalCheckpoint acidState
= do cutFileLog (localEvents acidState)
mvar <- newEmptyMVar
withCoreState (localCore acidState) $ \st ->
do eventId <- askCurrentEntryId (localEvents acidState)
pushAction (localEvents acidState) $
do let encoded = runPutLazy (safePut st)
pushEntry (localCheckpoints acidState) (Checkpoint eventId encoded) (putMVar mvar ())
takeMVar mvar
createCheckpointAndClose :: (SafeCopy st, Typeable st) => AcidState st -> IO ()
createCheckpointAndClose abstract_state
= do mvar <- newEmptyMVar
closeCore' (localCore acidState) $ \st ->
do eventId <- askCurrentEntryId (localEvents acidState)
pushAction (localEvents acidState) $
pushEntry (localCheckpoints acidState) (Checkpoint eventId (runPutLazy (safePut st))) (putMVar mvar ())
takeMVar mvar
closeFileLog (localEvents acidState)
closeFileLog (localCheckpoints acidState)
unlockFile (localLock acidState)
where acidState = downcast abstract_state
data Checkpoint = Checkpoint EntryId ByteString
instance SafeCopy Checkpoint where
kind = primitive
putCopy (Checkpoint eventEntryId content)
= contain $
do safePut eventEntryId
safePut content
getCopy = contain $ Checkpoint <$> safeGet <*> safeGet
openLocalState :: (Typeable st, IsAcidic st)
=> st
-> IO (AcidState st)
openLocalState initialState =
openLocalStateFrom ("state" </> show (typeOf initialState)) initialState
prepareLocalState :: (Typeable st, IsAcidic st)
=> st
-> IO (IO (AcidState st))
prepareLocalState initialState =
prepareLocalStateFrom ("state" </> show (typeOf initialState)) initialState
openLocalStateFrom :: (IsAcidic st)
=> FilePath
-> st
-> IO (AcidState st)
openLocalStateFrom directory initialState =
join $ resumeLocalStateFrom directory initialState False
prepareLocalStateFrom :: (IsAcidic st)
=> FilePath
-> st
-> IO (IO (AcidState st))
prepareLocalStateFrom directory initialState =
resumeLocalStateFrom directory initialState True
resumeLocalStateFrom :: (IsAcidic st)
=> FilePath
-> st
-> Bool
-> IO (IO (AcidState st))
resumeLocalStateFrom directory initialState delayLocking =
case delayLocking of
True -> do
(n, st) <- loadCheckpoint
return $ do
lock <- maybeLockFile lockFile
replayEvents lock n st
False -> do
lock <- maybeLockFile lockFile
(n, st) <- loadCheckpoint `onException` unlockFile lock
return $ do
replayEvents lock n st
where
lockFile = directory </> "open.lock"
eventsLogKey = LogKey { logDirectory = directory
, logPrefix = "events" }
checkpointsLogKey = LogKey { logDirectory = directory
, logPrefix = "checkpoints" }
loadCheckpoint = do
mbLastCheckpoint <- Log.newestEntry checkpointsLogKey
case mbLastCheckpoint of
Nothing ->
return (0, initialState)
Just (Checkpoint eventCutOff content) -> do
case runGetLazy safeGet content of
Left msg -> checkpointRestoreError msg
Right val -> return (eventCutOff, val)
replayEvents lock n st = do
core <- mkCore (eventsToMethods acidEvents) st
eventsLog <- openFileLog eventsLogKey
events <- readEntriesFrom eventsLog n
mapM_ (runColdMethod core) events
ensureLeastEntryId eventsLog n
checkpointsLog <- openFileLog checkpointsLogKey
stateCopy <- newIORef undefined
withCoreState core (writeIORef stateCopy)
return $ toAcidState LocalState { localCore = core
, localCopy = stateCopy
, localEvents = eventsLog
, localCheckpoints = checkpointsLog
, localLock = lock
}
maybeLockFile path = do
createDirectoryIfMissing True (takeDirectory path)
maybe (throwIO (StateIsLocked path))
return =<< tryLockFile path Exclusive
checkpointRestoreError msg
= error $ "Could not parse saved checkpoint due to the following error: " ++ msg
closeLocalState :: LocalState st -> IO ()
closeLocalState acidState
= do closeCore (localCore acidState)
closeFileLog (localEvents acidState)
closeFileLog (localCheckpoints acidState)
unlockFile (localLock acidState)
createLocalArchive :: LocalState st -> IO ()
createLocalArchive state
= do
currentCheckpointId <- cutFileLog (localCheckpoints state)
let durableCheckpointId = currentCheckpointId-1
checkpoints <- readEntriesFrom (localCheckpoints state) durableCheckpointId
case checkpoints of
[] -> return ()
(Checkpoint entryId _content : _)
-> do
archiveFileLog (localEvents state) entryId
archiveFileLog (localCheckpoints state) durableCheckpointId
toAcidState :: IsAcidic st => LocalState st -> AcidState st
toAcidState local
= AcidState { _scheduleUpdate = scheduleLocalUpdate local
, scheduleColdUpdate = scheduleLocalColdUpdate local
, _query = localQuery local
, queryCold = localQueryCold local
, createCheckpoint = createLocalCheckpoint local
, createArchive = createLocalArchive local
, closeAcidState = closeLocalState local
, acidSubState = mkAnyState local
}