{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE ForeignFunctionInterface #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fobject-code #-}
{-# OPTIONS_HADDOCK hide #-}
module Data.Array.Accelerate.Debug.Monitoring (
beginMonitoring,
initAccMetrics,
Processor(..),
withProcessor, addProcessorTime,
didAllocateBytesLocal, didAllocateBytesRemote,
didCopyBytesToRemote, didCopyBytesFromRemote,
increaseCurrentBytesRemote, decreaseCurrentBytesRemote,
increaseCurrentBytesNursery, decreaseCurrentBytesNursery, getCurrentBytesNursery, setCurrentBytesNursery,
didRemoteGC,
didEvictBytes,
) where
#ifdef ACCELERATE_MONITORING
import Data.Array.Accelerate.Debug.Clock
import System.Metrics
import System.Remote.Monitoring
import Control.Concurrent
import Control.Concurrent.Async
import Data.IORef
import Data.Text ( Text )
import Text.Printf
import qualified Data.HashMap.Strict as Map
#endif
#if defined(ACCELERATE_MONITORING) || defined(ACCELERATE_DEBUG)
import Control.Monad
#endif
import Data.Atomic ( Atomic )
import qualified Data.Atomic as Atomic
import Data.Int
import Language.Haskell.TH.Syntax
beginMonitoring :: IO ()
#ifdef ACCELERATE_MONITORING
beginMonitoring = do
store <- initAccMetrics
registerGcMetrics store
r <- withAsync (forkServerWith store "localhost" 8000 >> threadDelay 10000) waitCatch
case r of
Right _ -> printf "EKG monitor started at: http://localhost:8000\n"
Left _ -> printf "Failed to start EKG monitor\n"
#else
beginMonitoring :: IO ()
beginMonitoring = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#endif
#ifndef ACCELERATE_MONITORING
initAccMetrics :: IO a
initAccMetrics :: IO a
initAccMetrics = [Char] -> IO a
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO a) -> [Char] -> IO a
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
unlines [ [Char]
"Data.Array.Accelerate: Monitoring is disabled."
, [Char]
"Reinstall package 'accelerate' with '-fekg' to enable it." ]
#else
initAccMetrics :: IO Store
initAccMetrics = do
store <- newStore
registerRate "acc.load.llvm_native" (estimateProcessorLoad __active_ns_llvm_native) store
registerRate "acc.load.llvm_ptx" (estimateProcessorLoad __active_ns_llvm_ptx) store
registerGauge "acc.gc.current_bytes_remote" (Atomic.read __current_bytes_remote) store
registerGauge "acc.gc.current_bytes_nursery" (Atomic.read __current_bytes_nursery) store
registerCounter "acc.gc.bytes_allocated_local" (Atomic.read __total_bytes_allocated_local) store
registerCounter "acc.gc.bytes_allocated_remote" (Atomic.read __total_bytes_allocated_remote) store
registerCounter "acc.gc.bytes_copied_to_remote" (Atomic.read __total_bytes_copied_to_remote) store
registerCounter "acc.gc.bytes_copied_from_remote" (Atomic.read __total_bytes_copied_from_remote) store
registerCounter "acc.gc.bytes_evicted_from_remote" (Atomic.read __total_bytes_evicted_from_remote) store
registerCounter "acc.gc.num_gcs" (Atomic.read __num_remote_gcs) store
registerCounter "acc.gc.num_lru_evict" (Atomic.read __num_evictions) store
return store
registerRate :: Text -> (IORef EMAState -> IO Int64) -> Store -> IO ()
registerRate name sample store = do
now <- getMonotonicTime
st <- newIORef (ES now 0 0)
registerGroup (Map.singleton name Gauge) (sample st) store
#endif
data Processor = Native | PTX
{-# INLINE withProcessor #-}
withProcessor :: Processor -> IO a -> IO a
#ifndef ACCELERATE_MONITORING
withProcessor :: Processor -> IO a -> IO a
withProcessor Processor
_ = IO a -> IO a
forall a. a -> a
id
#else
withProcessor Native = withProcessor' __active_ns_llvm_native
withProcessor PTX = withProcessor' __active_ns_llvm_ptx
withProcessor' :: Atomic -> IO a -> IO a
withProcessor' var action = do
wall0 <- getMonotonicTime
!r <- action
wall1 <- getMonotonicTime
addProcessorTime' var (wall1 - wall0)
return r
#endif
{-# INLINE addProcessorTime #-}
addProcessorTime :: Processor -> Double -> IO ()
#ifndef ACCELERATE_MONITORING
addProcessorTime :: Processor -> Double -> IO ()
addProcessorTime Processor
_ Double
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
addProcessorTime Native = addProcessorTime' __active_ns_llvm_native
addProcessorTime PTX = addProcessorTime' __active_ns_llvm_ptx
addProcessorTime' :: Atomic -> Double -> IO ()
addProcessorTime' var secs =
let ns = round (secs * 1.0E9)
in void $ Atomic.add var ns
#endif
{-# INLINE didAllocateBytesLocal #-}
didAllocateBytesLocal :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didAllocateBytesLocal :: Int64 -> IO ()
didAllocateBytesLocal Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didAllocateBytesLocal n = do
void $ Atomic.add __total_bytes_allocated_local n
#endif
{-# INLINE didAllocateBytesRemote #-}
{-# INLINE increaseCurrentBytesRemote #-}
{-# INLINE decreaseCurrentBytesRemote #-}
didAllocateBytesRemote :: Int64 -> IO ()
decreaseCurrentBytesRemote :: Int64 -> IO ()
increaseCurrentBytesRemote :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didAllocateBytesRemote :: Int64 -> IO ()
didAllocateBytesRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
increaseCurrentBytesRemote :: Int64 -> IO ()
increaseCurrentBytesRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
decreaseCurrentBytesRemote :: Int64 -> IO ()
decreaseCurrentBytesRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didAllocateBytesRemote n = do
void $ Atomic.add __total_bytes_allocated_remote n
increaseCurrentBytesRemote n = void $ Atomic.add __current_bytes_remote n
decreaseCurrentBytesRemote n = void $ Atomic.subtract __current_bytes_remote n
#endif
{-# INLINE didCopyBytesToRemote #-}
{-# INLINE didCopyBytesFromRemote #-}
didCopyBytesFromRemote :: Int64 -> IO ()
didCopyBytesToRemote :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didCopyBytesToRemote :: Int64 -> IO ()
didCopyBytesToRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
didCopyBytesFromRemote :: Int64 -> IO ()
didCopyBytesFromRemote Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didCopyBytesToRemote n = void $ Atomic.add __total_bytes_copied_to_remote n
didCopyBytesFromRemote n = void $ Atomic.add __total_bytes_copied_from_remote n
#endif
{-# INLINE increaseCurrentBytesNursery #-}
{-# INLINE decreaseCurrentBytesNursery #-}
{-# INLINE setCurrentBytesNursery #-}
increaseCurrentBytesNursery :: Int64 -> IO ()
decreaseCurrentBytesNursery :: Int64 -> IO ()
setCurrentBytesNursery :: Int64 -> IO ()
getCurrentBytesNursery :: IO Int64
#ifndef ACCELERATE_DEBUG
increaseCurrentBytesNursery :: Int64 -> IO ()
increaseCurrentBytesNursery Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
decreaseCurrentBytesNursery :: Int64 -> IO ()
decreaseCurrentBytesNursery Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
setCurrentBytesNursery :: Int64 -> IO ()
setCurrentBytesNursery Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
getCurrentBytesNursery :: IO Int64
getCurrentBytesNursery = Int64 -> IO Int64
forall (m :: * -> *) a. Monad m => a -> m a
return Int64
0
#else
increaseCurrentBytesNursery n = void $ Atomic.add __current_bytes_nursery n
decreaseCurrentBytesNursery n = void $ Atomic.subtract __current_bytes_nursery n
setCurrentBytesNursery n = Atomic.write __current_bytes_nursery n
getCurrentBytesNursery = Atomic.read __current_bytes_nursery
#endif
{-# INLINE didRemoteGC #-}
didRemoteGC :: IO ()
#ifndef ACCELERATE_DEBUG
didRemoteGC :: IO ()
didRemoteGC = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didRemoteGC = void $ Atomic.add __num_remote_gcs 1
#endif
{-# INLINE didEvictBytes #-}
didEvictBytes :: Int64 -> IO ()
#ifndef ACCELERATE_DEBUG
didEvictBytes :: Int64 -> IO ()
didEvictBytes Int64
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#else
didEvictBytes n = do
void $ Atomic.add __num_evictions 1
void $ Atomic.add __total_bytes_evicted_from_remote n
#endif
#ifdef ACCELERATE_MONITORING
data EMAState = ES
{ old_time :: {-# UNPACK #-} !Double
, old_inst :: {-# UNPACK #-} !Double
, old_avg :: {-# UNPACK #-} !Double
}
estimateProcessorLoad :: Atomic -> IORef EMAState -> IO Int64
estimateProcessorLoad !var !ref = do
ES{..} <- readIORef ref
time <- getMonotonicTime
sample <- Atomic.and var 0
let
active_ns = fromIntegral sample
elapsed_s = old_time - time
elapsed_ns = 1.0E9 * elapsed_s
new_inst = 100 * (active_ns / elapsed_ns)
new_avg = ema 0.2 elapsed_s old_avg old_inst new_inst
writeIORef ref (ES time new_inst new_avg)
return (round new_avg)
ema :: Double -> Double -> Double -> Double -> Double -> Double
ema !alpha !dt !old_ema !old_sample !new_sample =
let
a = dt / alpha
u = exp ( -a )
v = ( 1 - u ) / a
in
(u * old_ema) + ((v-u) * old_sample) + ((1-v) * new_sample)
#endif
foreign import ccall "&__active_ns_llvm_native" __active_ns_llvm_native :: Atomic
foreign import ccall "&__active_ns_llvm_ptx" __active_ns_llvm_ptx :: Atomic
foreign import ccall "&__current_bytes_remote" __current_bytes_remote :: Atomic
foreign import ccall "&__current_bytes_nursery" __current_bytes_nursery :: Atomic
foreign import ccall "&__total_bytes_allocated_local" __total_bytes_allocated_local :: Atomic
foreign import ccall "&__total_bytes_allocated_remote" __total_bytes_allocated_remote :: Atomic
foreign import ccall "&__total_bytes_copied_to_remote" __total_bytes_copied_to_remote :: Atomic
foreign import ccall "&__total_bytes_copied_from_remote" __total_bytes_copied_from_remote :: Atomic
foreign import ccall "&__total_bytes_evicted_from_remote" __total_bytes_evicted_from_remote :: Atomic
foreign import ccall "&__num_remote_gcs" __num_remote_gcs :: Atomic
foreign import ccall "&__num_evictions" __num_evictions :: Atomic
runQ $ do
addForeignFilePath LangC "cbits/monitoring.c"
return []