module Log.Logger
( LoggerEnv(..)
, Logger
, mkLogger
, mkBulkLogger
, mkBulkLogger'
, execLogger
, waitForLogger
, shutdownLogger
) where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Semigroup
import Prelude
import qualified Data.Aeson.Types as A
import qualified Data.Text as T
import qualified Data.Text.IO as T
import Log.Data
import Log.Internal.Logger
data LoggerEnv = LoggerEnv
{ leLogger :: !Logger
, leComponent :: !T.Text
, leDomain :: ![T.Text]
, leData :: ![A.Pair]
}
mkLogger :: T.Text -> (LogMessage -> IO ()) -> IO Logger
mkLogger name exec = mkLoggerImpl
newTQueueIO isEmptyTQueue readTQueue writeTQueue (return ())
name exec (return ())
mkBulkLogger :: T.Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger
mkBulkLogger = mkBulkLogger' sbDefaultCapacity 1000000
mkBulkLogger'
:: Int
-> Int
-> T.Text
-> ([LogMessage] -> IO ())
-> IO ()
-> IO Logger
mkBulkLogger' cap dur = mkLoggerImpl
(newSBQueueIO cap) isEmptySBQueue readSBQueue writeSBQueue
(threadDelay dur)
data SBQueue a = SBQueue !(TVar [a]) !(TVar Int) !Int
sbDefaultCapacity :: Int
sbDefaultCapacity = 1000000
newSBQueueIO :: Int -> IO (SBQueue a)
newSBQueueIO capacity = SBQueue <$> newTVarIO [] <*> newTVarIO 0 <*> pure capacity
isEmptySBQueue :: SBQueue a -> STM Bool
isEmptySBQueue (SBQueue queue count _capacity) = do
isEmpty <- null <$> readTVar queue
numElems <- readTVar count
assert (if isEmpty then numElems == 0 else numElems > 0) $
return isEmpty
readSBQueue :: SBQueue a -> STM [a]
readSBQueue (SBQueue queue count _capacity) = do
elems <- readTVar queue
when (null elems) retry
writeTVar queue []
writeTVar count 0
return $ reverse elems
writeSBQueue :: SBQueue a -> a -> STM ()
writeSBQueue (SBQueue queue count capacity) a = do
numElems <- readTVar count
if numElems < capacity
then do modifyTVar queue (a :)
modifyTVar count (+1)
else return ()
mkLoggerImpl :: IO queue
-> (queue -> STM Bool)
-> (queue -> STM msgs)
-> (queue -> LogMessage -> STM ())
-> IO ()
-> T.Text
-> (msgs -> IO ())
-> IO ()
-> IO Logger
mkLoggerImpl newQueue isQueueEmpty readQueue writeQueue afterExecDo
name exec sync = do
queue <- newQueue
inProgress <- newTVarIO False
isRunning <- newTVarIO True
tid <- forkFinally (forever $ loop queue inProgress)
(\_ -> cleanup queue inProgress)
return Logger {
loggerWriteMessage = \msg -> atomically $ do
checkIsRunning isRunning
writeQueue queue msg,
loggerWaitForWrite = do
atomically $ waitForWrite queue inProgress
sync,
loggerShutdown = do
killThread tid
atomically $ writeTVar isRunning False
}
where
checkIsRunning isRunning' = do
isRunning <- readTVar isRunning'
when (not isRunning) $
throwSTM (AssertionFailed $ "Log.Logger.mkLoggerImpl: "
++ "attempt to write to a shut down logger")
loop queue inProgress = do
step queue inProgress
afterExecDo
step queue inProgress = do
msgs <- atomically $ do
writeTVar inProgress True
readQueue queue
exec msgs
atomically $ writeTVar inProgress False
cleanup queue inProgress = do
step queue inProgress
sync
printLoggerTerminated
waitForWrite queue inProgress = do
isEmpty <- isQueueEmpty queue
isInProgress <- readTVar inProgress
when (not isEmpty || isInProgress) retry
printLoggerTerminated = T.putStrLn $ name <> ": logger thread terminated"