module System.Remote.Monitoring.Push
(
Push
, PushChan
, PushOptions(..)
, pushThreadId
, forkPush
, defaultPushOptions
, subscribe
, consume
) where
import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan)
import qualified Data.HashMap.Strict as M
import Data.Int (Int64)
import qualified Data.Text as T
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified System.Metrics as Metrics
#if __GLASGOW_HASKELL__ >= 706
import Control.Concurrent (forkFinally)
#else
import Control.Concurrent (forkIO)
import Control.Exception (SomeException, mask, try)
import Prelude hiding (catch)
#endif
data Push = Push
{ threadId :: !ThreadId
, mainCh :: Chan (Metrics.Sample)
}
data PushChan = PushChan
{ ch :: PushChanType }
type PushChanType = Chan Metrics.Sample
pushThreadId :: Push -> ThreadId
pushThreadId = threadId
data PushOptions = PushOptions
{
flushInterval :: !Int
, debug :: !Bool
, prefix :: !T.Text
, suffix :: !T.Text
}
defaultPushOptions :: PushOptions
defaultPushOptions = PushOptions
{ flushInterval = 1000
, debug = False
, prefix = ""
, suffix = ""
}
forkPush :: PushOptions
-> Metrics.Store
-> IO Push
forkPush opts store = do
me <- myThreadId
ch <- newChan
tid <- forkFinally (loop ch store emptySample opts) $ \ r -> do
case r of
Left e -> throwTo me e
Right _ -> return ()
return $ Push tid ch
where
emptySample = M.empty
loop :: PushChanType
-> Metrics.Store
-> Metrics.Sample
-> PushOptions
-> IO ()
loop ch store lastSample opts = do
start <- time
sample <- Metrics.sampleAll store
let !diff = diffSamples opts lastSample sample
writeChan ch diff
end <- time
threadDelay (flushInterval opts * 1000 fromIntegral (end start))
loop ch store sample opts
subscribe :: Push -> IO PushChan
subscribe Push{..} = do
ch' <- dupChan mainCh
return $ PushChan {
ch = ch'
}
consume :: PushChan -> IO Metrics.Sample
consume PushChan{..} = readChan ch
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` getPOSIXTime
where toDouble = realToFrac :: Real a => a -> Double
diffSamples :: PushOptions -> Metrics.Sample -> Metrics.Sample -> Metrics.Sample
diffSamples opts prev curr = M.foldlWithKey' combine M.empty curr
where
combine m name new = case M.lookup name prev of
Just old -> case diffMetric old new of
Just val -> M.insert name' val m
Nothing -> m
_ -> M.insert name' new m
where
name' = T.append (prefix opts) (T.append name (suffix opts))
diffMetric :: Metrics.Value -> Metrics.Value -> Maybe Metrics.Value
diffMetric (Metrics.Counter n1) (Metrics.Counter n2)
| n1 == n2 = Nothing
| otherwise = Just $! Metrics.Counter $ n2 n1
diffMetric (Metrics.Gauge n1) (Metrics.Gauge n2)
| n1 == n2 = Nothing
| otherwise = Just $ Metrics.Gauge n2
diffMetric (Metrics.Label n1) (Metrics.Label n2)
| n1 == n2 = Nothing
| otherwise = Just $ Metrics.Label n2
diffMetric _ _ = Nothing
#if __GLASGOW_HASKELL__ < 706
forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally action and_then =
mask $ \restore ->
forkIO $ try (restore action) >>= and_then
#endif