{-# LANGUAGE OverloadedStrings #-}
module Core
(
Core (..),
EnqueueResult (..),
Command (..),
ServerState,
Updated (..),
enqueueCommand,
tryEnqueueCommand,
getCurrentValue,
withCoreMetrics,
lookup,
newCore,
postQuit,
runCommandLoop,
runSyncTimer
)
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, putMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue, isFullTBQueue)
import Control.Concurrent.STM.TVar (TVar, newTVarIO)
import Control.Monad (forever, unless)
import Control.Monad.IO.Class
import Data.Aeson (Value (..))
import Data.Foldable (forM_)
import Data.Traversable (for)
import Data.UUID (UUID)
import Prelude hiding (log, writeFile)
import qualified Network.WebSockets as WS
import Config (Config (..), periodicSyncingEnabled)
import Logger (Logger)
import Store (Path, Modification (..))
import Subscription (SubscriptionTree, empty)
import Persistence (PersistentValue, PersistenceConfig (..))
import qualified Store
import qualified Persistence
import qualified Metrics
data Command
= Sync
| Modify Modification (Maybe (MVar ()))
| Stop
deriving (Eq)
data Updated = Updated Path Value deriving (Eq, Show)
data EnqueueResult = Enqueued | Dropped
deriving (Show, Eq, Ord, Enum, Bounded)
data Core = Core
{ coreCurrentValue :: PersistentValue
, coreValueIsDirty :: TVar Bool
, coreQueue :: TBQueue Command
, coreUpdates :: TBQueue (Maybe Updated)
, coreClients :: MVar ServerState
, coreLogger :: Logger
, coreConfig :: Config
, coreMetrics :: Maybe Metrics.IcepeakMetrics
}
type ServerState = SubscriptionTree UUID WS.Connection
newServerState :: ServerState
newServerState = empty
newCore :: Config -> Logger -> Maybe Metrics.IcepeakMetrics -> IO (Either String Core)
newCore config logger metrics = do
let queueCapacity = fromIntegral . configQueueCapacity $ config
let filePath = Persistence.getDataFile (configStorageBackend config) (configDataFile config)
journalFile
| configEnableJournaling config
&& periodicSyncingEnabled config = Just $ filePath ++ ".journal"
| otherwise = Nothing
eitherValue <- Persistence.loadFromBackend (configStorageBackend config) PersistenceConfig
{ pcDataFile = filePath
, pcJournalFile = journalFile
, pcLogger = logger
, pcMetrics = metrics
}
for eitherValue $ \value -> do
tdirty <- newTVarIO False
tqueue <- newTBQueueIO queueCapacity
tupdates <- newTBQueueIO queueCapacity
tclients <- newMVar newServerState
pure (Core value tdirty tqueue tupdates tclients logger config metrics)
postQuit :: Core -> IO ()
postQuit core = do
atomically $ do
writeTBQueue (coreQueue core) Stop
writeTBQueue (coreUpdates core) Nothing
tryEnqueueCommand :: Command -> Core -> IO EnqueueResult
tryEnqueueCommand cmd core = atomically $ do
isFull <- isFullTBQueue (coreQueue core)
unless isFull $ writeTBQueue (coreQueue core) cmd
pure $ if isFull then Dropped else Enqueued
enqueueCommand :: Command -> Core -> IO ()
enqueueCommand cmd core = atomically $ writeTBQueue (coreQueue core) cmd
getCurrentValue :: Core -> Path -> IO (Maybe Value)
getCurrentValue core path =
fmap (Store.lookup path) $ atomically $ Persistence.getValue $ coreCurrentValue core
withCoreMetrics :: MonadIO m => Core -> (Metrics.IcepeakMetrics -> IO ()) -> m ()
withCoreMetrics core act = liftIO $ forM_ (coreMetrics core) act
runCommandLoop :: Core -> IO ()
runCommandLoop core = go
where
config = coreConfig core
currentValue = coreCurrentValue core
storageBackend = configStorageBackend config
go = do
command <- atomically $ readTBQueue (coreQueue core)
case command of
Modify op maybeNotifyVar -> do
Persistence.apply op currentValue
postUpdate (Store.modificationPath op) core
unless (periodicSyncingEnabled $ coreConfig core) $
Persistence.syncToBackend storageBackend currentValue
mapM_ (`putMVar` ()) maybeNotifyVar
go
Sync -> do
Persistence.syncToBackend storageBackend currentValue
go
Stop -> Persistence.syncToBackend storageBackend currentValue
postUpdate :: Path -> Core -> IO ()
postUpdate path core = atomically $ do
value <- Persistence.getValue (coreCurrentValue core)
writeTBQueue (coreUpdates core) (Just $ Updated path value)
runSyncTimer :: Core -> IO ()
runSyncTimer core = mapM_ go (configSyncIntervalMicroSeconds $ coreConfig core)
where
go interval = forever $ do
enqueueCommand Sync core
threadDelay interval