{-# LANGUAGE OverloadedStrings #-}
module Core
(
Core (..),
EnqueueResult (..),
Command (..),
ServerState,
Updated (..),
enqueueCommand,
tryEnqueueCommand,
getCurrentValue,
withCoreMetrics,
lookup,
newCore,
postQuit,
runCommandLoop,
runSyncTimer
)
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, putMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue, isFullTBQueue)
import Control.Concurrent.STM.TVar (TVar, newTVarIO)
import Control.Monad (forever, unless)
import Control.Monad.IO.Class
import Data.Aeson (Value (..))
import Data.Foldable (forM_)
import Data.Traversable (for)
import Data.UUID (UUID)
import Prelude hiding (log, writeFile)
import qualified Network.WebSockets as WS
import Config (Config (..), periodicSyncingEnabled)
import Logger (Logger)
import Store (Path, Modification (..))
import Subscription (SubscriptionTree, empty)
import Persistence (PersistentValue, PersistenceConfig (..))
import qualified Store
import qualified Persistence
import qualified Metrics
data Command
= Sync
| Modify Modification (Maybe (MVar ()))
| Stop
deriving (Command -> Command -> Bool
(Command -> Command -> Bool)
-> (Command -> Command -> Bool) -> Eq Command
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Command -> Command -> Bool
$c/= :: Command -> Command -> Bool
== :: Command -> Command -> Bool
$c== :: Command -> Command -> Bool
Eq)
data Updated = Updated Path Value deriving (Updated -> Updated -> Bool
(Updated -> Updated -> Bool)
-> (Updated -> Updated -> Bool) -> Eq Updated
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Updated -> Updated -> Bool
$c/= :: Updated -> Updated -> Bool
== :: Updated -> Updated -> Bool
$c== :: Updated -> Updated -> Bool
Eq, Int -> Updated -> ShowS
[Updated] -> ShowS
Updated -> String
(Int -> Updated -> ShowS)
-> (Updated -> String) -> ([Updated] -> ShowS) -> Show Updated
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Updated] -> ShowS
$cshowList :: [Updated] -> ShowS
show :: Updated -> String
$cshow :: Updated -> String
showsPrec :: Int -> Updated -> ShowS
$cshowsPrec :: Int -> Updated -> ShowS
Show)
data EnqueueResult = Enqueued | Dropped
deriving (Int -> EnqueueResult -> ShowS
[EnqueueResult] -> ShowS
EnqueueResult -> String
(Int -> EnqueueResult -> ShowS)
-> (EnqueueResult -> String)
-> ([EnqueueResult] -> ShowS)
-> Show EnqueueResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EnqueueResult] -> ShowS
$cshowList :: [EnqueueResult] -> ShowS
show :: EnqueueResult -> String
$cshow :: EnqueueResult -> String
showsPrec :: Int -> EnqueueResult -> ShowS
$cshowsPrec :: Int -> EnqueueResult -> ShowS
Show, EnqueueResult -> EnqueueResult -> Bool
(EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool) -> Eq EnqueueResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EnqueueResult -> EnqueueResult -> Bool
$c/= :: EnqueueResult -> EnqueueResult -> Bool
== :: EnqueueResult -> EnqueueResult -> Bool
$c== :: EnqueueResult -> EnqueueResult -> Bool
Eq, Eq EnqueueResult
Eq EnqueueResult
-> (EnqueueResult -> EnqueueResult -> Ordering)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> Bool)
-> (EnqueueResult -> EnqueueResult -> EnqueueResult)
-> (EnqueueResult -> EnqueueResult -> EnqueueResult)
-> Ord EnqueueResult
EnqueueResult -> EnqueueResult -> Bool
EnqueueResult -> EnqueueResult -> Ordering
EnqueueResult -> EnqueueResult -> EnqueueResult
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: EnqueueResult -> EnqueueResult -> EnqueueResult
$cmin :: EnqueueResult -> EnqueueResult -> EnqueueResult
max :: EnqueueResult -> EnqueueResult -> EnqueueResult
$cmax :: EnqueueResult -> EnqueueResult -> EnqueueResult
>= :: EnqueueResult -> EnqueueResult -> Bool
$c>= :: EnqueueResult -> EnqueueResult -> Bool
> :: EnqueueResult -> EnqueueResult -> Bool
$c> :: EnqueueResult -> EnqueueResult -> Bool
<= :: EnqueueResult -> EnqueueResult -> Bool
$c<= :: EnqueueResult -> EnqueueResult -> Bool
< :: EnqueueResult -> EnqueueResult -> Bool
$c< :: EnqueueResult -> EnqueueResult -> Bool
compare :: EnqueueResult -> EnqueueResult -> Ordering
$ccompare :: EnqueueResult -> EnqueueResult -> Ordering
$cp1Ord :: Eq EnqueueResult
Ord, Int -> EnqueueResult
EnqueueResult -> Int
EnqueueResult -> [EnqueueResult]
EnqueueResult -> EnqueueResult
EnqueueResult -> EnqueueResult -> [EnqueueResult]
EnqueueResult -> EnqueueResult -> EnqueueResult -> [EnqueueResult]
(EnqueueResult -> EnqueueResult)
-> (EnqueueResult -> EnqueueResult)
-> (Int -> EnqueueResult)
-> (EnqueueResult -> Int)
-> (EnqueueResult -> [EnqueueResult])
-> (EnqueueResult -> EnqueueResult -> [EnqueueResult])
-> (EnqueueResult -> EnqueueResult -> [EnqueueResult])
-> (EnqueueResult
-> EnqueueResult -> EnqueueResult -> [EnqueueResult])
-> Enum EnqueueResult
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: EnqueueResult -> EnqueueResult -> EnqueueResult -> [EnqueueResult]
$cenumFromThenTo :: EnqueueResult -> EnqueueResult -> EnqueueResult -> [EnqueueResult]
enumFromTo :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
$cenumFromTo :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
enumFromThen :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
$cenumFromThen :: EnqueueResult -> EnqueueResult -> [EnqueueResult]
enumFrom :: EnqueueResult -> [EnqueueResult]
$cenumFrom :: EnqueueResult -> [EnqueueResult]
fromEnum :: EnqueueResult -> Int
$cfromEnum :: EnqueueResult -> Int
toEnum :: Int -> EnqueueResult
$ctoEnum :: Int -> EnqueueResult
pred :: EnqueueResult -> EnqueueResult
$cpred :: EnqueueResult -> EnqueueResult
succ :: EnqueueResult -> EnqueueResult
$csucc :: EnqueueResult -> EnqueueResult
Enum, EnqueueResult
EnqueueResult -> EnqueueResult -> Bounded EnqueueResult
forall a. a -> a -> Bounded a
maxBound :: EnqueueResult
$cmaxBound :: EnqueueResult
minBound :: EnqueueResult
$cminBound :: EnqueueResult
Bounded)
data Core = Core
{ Core -> PersistentValue
coreCurrentValue :: PersistentValue
, Core -> TVar Bool
coreValueIsDirty :: TVar Bool
, Core -> TBQueue Command
coreQueue :: TBQueue Command
, Core -> TBQueue (Maybe Updated)
coreUpdates :: TBQueue (Maybe Updated)
, Core -> MVar ServerState
coreClients :: MVar ServerState
, Core -> Logger
coreLogger :: Logger
, Core -> Config
coreConfig :: Config
, Core -> Maybe IcepeakMetrics
coreMetrics :: Maybe Metrics.IcepeakMetrics
}
type ServerState = SubscriptionTree UUID WS.Connection
newServerState :: ServerState
newServerState :: ServerState
newServerState = ServerState
forall id conn. SubscriptionTree id conn
empty
newCore :: Config -> Logger -> Maybe Metrics.IcepeakMetrics -> IO (Either String Core)
newCore :: Config -> Logger -> Maybe IcepeakMetrics -> IO (Either String Core)
newCore Config
config Logger
logger Maybe IcepeakMetrics
metrics = do
let queueCapacity :: Natural
queueCapacity = Word -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word -> Natural) -> (Config -> Word) -> Config -> Natural
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Config -> Word
configQueueCapacity (Config -> Natural) -> Config -> Natural
forall a b. (a -> b) -> a -> b
$ Config
config
let filePath :: String
filePath = StorageBackend -> Maybe String -> String
Persistence.getDataFile (Config -> StorageBackend
configStorageBackend Config
config) (Config -> Maybe String
configDataFile Config
config)
journalFile :: Maybe String
journalFile
| Config -> Bool
configEnableJournaling Config
config
Bool -> Bool -> Bool
&& Config -> Bool
periodicSyncingEnabled Config
config = String -> Maybe String
forall a. a -> Maybe a
Just (String -> Maybe String) -> String -> Maybe String
forall a b. (a -> b) -> a -> b
$ String
filePath String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
".journal"
| Bool
otherwise = Maybe String
forall a. Maybe a
Nothing
Either String PersistentValue
eitherValue <- StorageBackend
-> PersistenceConfig -> IO (Either String PersistentValue)
Persistence.loadFromBackend (Config -> StorageBackend
configStorageBackend Config
config) PersistenceConfig :: String
-> Maybe String
-> Logger
-> Maybe IcepeakMetrics
-> PersistenceConfig
PersistenceConfig
{ pcDataFile :: String
pcDataFile = String
filePath
, pcJournalFile :: Maybe String
pcJournalFile = Maybe String
journalFile
, pcLogger :: Logger
pcLogger = Logger
logger
, pcMetrics :: Maybe IcepeakMetrics
pcMetrics = Maybe IcepeakMetrics
metrics
}
Either String PersistentValue
-> (PersistentValue -> IO Core) -> IO (Either String Core)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Either String PersistentValue
eitherValue ((PersistentValue -> IO Core) -> IO (Either String Core))
-> (PersistentValue -> IO Core) -> IO (Either String Core)
forall a b. (a -> b) -> a -> b
$ \PersistentValue
value -> do
TVar Bool
tdirty <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
TBQueue Command
tqueue <- Natural -> IO (TBQueue Command)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
queueCapacity
TBQueue (Maybe Updated)
tupdates <- Natural -> IO (TBQueue (Maybe Updated))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
queueCapacity
MVar ServerState
tclients <- ServerState -> IO (MVar ServerState)
forall a. a -> IO (MVar a)
newMVar ServerState
newServerState
Core -> IO Core
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PersistentValue
-> TVar Bool
-> TBQueue Command
-> TBQueue (Maybe Updated)
-> MVar ServerState
-> Logger
-> Config
-> Maybe IcepeakMetrics
-> Core
Core PersistentValue
value TVar Bool
tdirty TBQueue Command
tqueue TBQueue (Maybe Updated)
tupdates MVar ServerState
tclients Logger
logger Config
config Maybe IcepeakMetrics
metrics)
postQuit :: Core -> IO ()
postQuit :: Core -> IO ()
postQuit Core
core = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TBQueue Command -> Command -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue Command
coreQueue Core
core) Command
Stop
TBQueue (Maybe Updated) -> Maybe Updated -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue (Maybe Updated)
coreUpdates Core
core) Maybe Updated
forall a. Maybe a
Nothing
tryEnqueueCommand :: Command -> Core -> IO EnqueueResult
tryEnqueueCommand :: Command -> Core -> IO EnqueueResult
tryEnqueueCommand Command
cmd Core
core = STM EnqueueResult -> IO EnqueueResult
forall a. STM a -> IO a
atomically (STM EnqueueResult -> IO EnqueueResult)
-> STM EnqueueResult -> IO EnqueueResult
forall a b. (a -> b) -> a -> b
$ do
Bool
isFull <- TBQueue Command -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue (Core -> TBQueue Command
coreQueue Core
core)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isFull (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TBQueue Command -> Command -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue Command
coreQueue Core
core) Command
cmd
EnqueueResult -> STM EnqueueResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure (EnqueueResult -> STM EnqueueResult)
-> EnqueueResult -> STM EnqueueResult
forall a b. (a -> b) -> a -> b
$ if Bool
isFull then EnqueueResult
Dropped else EnqueueResult
Enqueued
enqueueCommand :: Command -> Core -> IO ()
enqueueCommand :: Command -> Core -> IO ()
enqueueCommand Command
cmd Core
core = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue Command -> Command -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue Command
coreQueue Core
core) Command
cmd
getCurrentValue :: Core -> Path -> IO (Maybe Value)
getCurrentValue :: Core -> Path -> IO (Maybe Value)
getCurrentValue Core
core Path
path =
(Value -> Maybe Value) -> IO Value -> IO (Maybe Value)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Path -> Value -> Maybe Value
Store.lookup Path
path) (IO Value -> IO (Maybe Value)) -> IO Value -> IO (Maybe Value)
forall a b. (a -> b) -> a -> b
$ STM Value -> IO Value
forall a. STM a -> IO a
atomically (STM Value -> IO Value) -> STM Value -> IO Value
forall a b. (a -> b) -> a -> b
$ PersistentValue -> STM Value
Persistence.getValue (PersistentValue -> STM Value) -> PersistentValue -> STM Value
forall a b. (a -> b) -> a -> b
$ Core -> PersistentValue
coreCurrentValue Core
core
withCoreMetrics :: MonadIO m => Core -> (Metrics.IcepeakMetrics -> IO ()) -> m ()
withCoreMetrics :: Core -> (IcepeakMetrics -> IO ()) -> m ()
withCoreMetrics Core
core IcepeakMetrics -> IO ()
act = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe IcepeakMetrics -> (IcepeakMetrics -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Core -> Maybe IcepeakMetrics
coreMetrics Core
core) IcepeakMetrics -> IO ()
act
runCommandLoop :: Core -> IO ()
runCommandLoop :: Core -> IO ()
runCommandLoop Core
core = IO ()
go
where
config :: Config
config = Core -> Config
coreConfig Core
core
currentValue :: PersistentValue
currentValue = Core -> PersistentValue
coreCurrentValue Core
core
storageBackend :: StorageBackend
storageBackend = Config -> StorageBackend
configStorageBackend Config
config
go :: IO ()
go = do
Command
command <- STM Command -> IO Command
forall a. STM a -> IO a
atomically (STM Command -> IO Command) -> STM Command -> IO Command
forall a b. (a -> b) -> a -> b
$ TBQueue Command -> STM Command
forall a. TBQueue a -> STM a
readTBQueue (Core -> TBQueue Command
coreQueue Core
core)
case Command
command of
Modify Modification
op Maybe (MVar ())
maybeNotifyVar -> do
Modification -> PersistentValue -> IO ()
Persistence.apply Modification
op PersistentValue
currentValue
Path -> Core -> IO ()
postUpdate (Modification -> Path
Store.modificationPath Modification
op) Core
core
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Config -> Bool
periodicSyncingEnabled (Config -> Bool) -> Config -> Bool
forall a b. (a -> b) -> a -> b
$ Core -> Config
coreConfig Core
core) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
StorageBackend -> PersistentValue -> IO ()
Persistence.syncToBackend StorageBackend
storageBackend PersistentValue
currentValue
(MVar () -> IO ()) -> Maybe (MVar ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
`putMVar` ()) Maybe (MVar ())
maybeNotifyVar
IO ()
go
Command
Sync -> do
StorageBackend -> PersistentValue -> IO ()
Persistence.syncToBackend StorageBackend
storageBackend PersistentValue
currentValue
IO ()
go
Command
Stop -> StorageBackend -> PersistentValue -> IO ()
Persistence.syncToBackend StorageBackend
storageBackend PersistentValue
currentValue
postUpdate :: Path -> Core -> IO ()
postUpdate :: Path -> Core -> IO ()
postUpdate Path
path Core
core = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Value
value <- PersistentValue -> STM Value
Persistence.getValue (Core -> PersistentValue
coreCurrentValue Core
core)
TBQueue (Maybe Updated) -> Maybe Updated -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Core -> TBQueue (Maybe Updated)
coreUpdates Core
core) (Updated -> Maybe Updated
forall a. a -> Maybe a
Just (Updated -> Maybe Updated) -> Updated -> Maybe Updated
forall a b. (a -> b) -> a -> b
$ Path -> Value -> Updated
Updated Path
path Value
value)
runSyncTimer :: Core -> IO ()
runSyncTimer :: Core -> IO ()
runSyncTimer Core
core = (Int -> IO Any) -> Maybe Int -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Int -> IO Any
forall b. Int -> IO b
go (Config -> Maybe Int
configSyncIntervalMicroSeconds (Config -> Maybe Int) -> Config -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Core -> Config
coreConfig Core
core)
where
go :: Int -> IO b
go Int
interval = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
Command -> Core -> IO ()
enqueueCommand Command
Sync Core
core
Int -> IO ()
threadDelay Int
interval