Module: OpenTracing.Reporting.Batch

This module provides a trace reporter that groups recorded spans into batches
before sending them to their destination in bulk.

{-# LANGUAGE LambdaCase        #-}
{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}
{-# LANGUAGE StrictData        #-}
{-# LANGUAGE TemplateHaskell   #-}
{-# LANGUAGE TypeApplications  #-}

module OpenTracing.Reporting.Batch
    ( BatchOptions
    , batchOptions
    , boptAtCapacity
    , boptBatchSize
    , boptErrorLog
    , boptQueueSize
    , boptReporter
    , boptTimeoutSec

    , AtCapacity (..)

    , defaultErrorLog

    , BatchEnv
    , newBatchEnv
    , closeBatchEnv

    , batchReporter

import           Control.Concurrent.Async
import           Control.Concurrent.STM
import           Control.Exception        (AsyncException (ThreadKilled))
import           Control.Exception.Safe
import           Control.Lens
import           Control.Monad
import           Control.Monad.IO.Class
import           Data.ByteString.Builder
import           Data.Time                (NominalDiffTime)
import           Data.Word
import           Numeric.Natural          (Natural)
import           OpenTracing.Span
import           OpenTracing.Time
import           System.IO                (stderr)
import           System.Timeout

-- | Options available to construct a batch reporter. Default options are
-- available with `batchOptions`
data BatchOptions = BatchOptions
    { BatchOptions -> Word16
_boptBatchSize  :: Word16
    -- ^ The maximum number of elements to report in a batch. Default 100
    , BatchOptions -> Word
_boptTimeoutSec :: Word
    -- ^ The maximum time (in seconds) to wait while reporting a batch before erroring.
    -- Default 5 seconds.
    , BatchOptions -> [FinishedSpan] -> IO ()
_boptReporter   :: [FinishedSpan] -> IO ()
    -- ^ The function to call with the batch of spans. Has an upper bound on size equal
    -- to _boptBatchSize. No default.
    , BatchOptions -> Builder -> IO ()
_boptErrorLog   :: Builder        -> IO ()
    -- ^ What to do with errors. Default print to stderr.
    , BatchOptions -> Natural
_boptQueueSize  :: Natural
    -- ^ Size of the queue holding batched spans. Default 1000
    , BatchOptions -> AtCapacity
_boptAtCapacity :: AtCapacity
    -- ^ What to do when the queue is at capacity. Default: Drop

-- | Policy to apply to new spans when the internal queue is at capacity.
data AtCapacity = Drop | Block

-- | Default batch options which can be overridden via lenses.
batchOptions :: ([FinishedSpan] -> IO ()) -> BatchOptions
batchOptions :: ([FinishedSpan] -> IO ()) -> BatchOptions
batchOptions [FinishedSpan] -> IO ()
f = BatchOptions
    { _boptBatchSize :: Word16
_boptBatchSize  = Word16
    , _boptTimeoutSec :: Word
_boptTimeoutSec = Word
    , _boptReporter :: [FinishedSpan] -> IO ()
_boptReporter   = [FinishedSpan] -> IO ()
    , _boptErrorLog :: Builder -> IO ()
_boptErrorLog   = Builder -> IO ()
    , _boptQueueSize :: Natural
_boptQueueSize  = Natural
    , _boptAtCapacity :: AtCapacity
_boptAtCapacity = AtCapacity

-- | An error logging function which prints to stderr.
defaultErrorLog :: Builder -> IO ()
defaultErrorLog :: Builder -> IO ()
defaultErrorLog = Handle -> Builder -> IO ()
hPutBuilder Handle

makeLenses ''BatchOptions

-- | The environment of a batch reporter.
data BatchEnv = BatchEnv
    { BatchEnv -> TBQueue FinishedSpan
envQ   :: TBQueue FinishedSpan
    -- ^ The queue of spans to be reported
    , BatchEnv -> Async ()
envRep :: Async ()
    -- ^ Asynchronous consumer of the queue
    , BatchEnv -> AtCapacity
envCap :: AtCapacity
    -- ^ Policy to apply when the queue is at capacity
    , BatchEnv -> Builder -> IO ()
envLog :: Builder -> IO ()
    -- ^ Where to report errors

-- | Create a new batch environment
newBatchEnv :: BatchOptions -> IO BatchEnv
newBatchEnv :: BatchOptions -> IO BatchEnv
newBatchEnv BatchOptions
opt = do
    TBQueue FinishedSpan
q <- Natural -> IO (TBQueue FinishedSpan)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO (BatchOptions -> Natural
_boptQueueSize BatchOptions
    Async ()
c <- BatchOptions -> TBQueue FinishedSpan -> IO (Async ())
consumer BatchOptions
opt TBQueue FinishedSpan
    BatchEnv -> IO BatchEnv
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure BatchEnv
        { envQ :: TBQueue FinishedSpan
envQ = TBQueue FinishedSpan
        , envRep :: Async ()
envRep = Async ()
        , envCap :: AtCapacity
envCap = BatchOptions -> AtCapacity
_boptAtCapacity BatchOptions
        , envLog :: Builder -> IO ()
envLog = BatchOptions -> Builder -> IO ()
_boptErrorLog BatchOptions

-- | Close a batch reporter, stop consuming any new spans. Any
-- spans in the queue will be drained.
closeBatchEnv :: BatchEnv -> IO ()
closeBatchEnv :: BatchEnv -> IO ()
closeBatchEnv = Async () -> IO ()
forall a. Async a -> IO ()
cancel (Async () -> IO ()) -> (BatchEnv -> Async ()) -> BatchEnv -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BatchEnv -> Async ()

-- | An implementation of `OpenTracing.Tracer.tracerReport` that batches the
-- finished spans for transimission to their destination.
-- If the underlying queue is currently at capacity, the behaviour depends on
-- the setting of `boptAtCapacity`: if the value is `Drop`, `fspan` is dropped,
-- otherwise, if the value is `Block`, the reporter will block until the queue
-- has enough space to accept the span.
--  In either case, a log record is emitted.
batchReporter :: MonadIO m => BatchEnv -> FinishedSpan -> m ()
batchReporter :: forall (m :: * -> *). MonadIO m => BatchEnv -> FinishedSpan -> m ()
batchReporter BatchEnv{envCap :: BatchEnv -> AtCapacity
envCap = AtCapacity
Block, TBQueue FinishedSpan
envQ :: BatchEnv -> TBQueue FinishedSpan
envQ :: TBQueue FinishedSpan
envQ, Builder -> IO ()
envLog :: BatchEnv -> Builder -> IO ()
envLog :: Builder -> IO ()
envLog} FinishedSpan
fspan = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
full <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue FinishedSpan -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue FinishedSpan
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
full (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
        Builder -> IO ()
envLog Builder
"Queue at capacity, enqueueing span may block\n"
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue FinishedSpan -> FinishedSpan -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue FinishedSpan
envQ FinishedSpan

batchReporter BatchEnv{envCap :: BatchEnv -> AtCapacity
envCap = AtCapacity
Drop, TBQueue FinishedSpan
envQ :: BatchEnv -> TBQueue FinishedSpan
envQ :: TBQueue FinishedSpan
envQ, Builder -> IO ()
envLog :: BatchEnv -> Builder -> IO ()
envLog :: Builder -> IO ()
envLog} FinishedSpan
fspan = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
full <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
full <- TBQueue FinishedSpan -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue FinishedSpan
        Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
full (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
            TBQueue FinishedSpan -> FinishedSpan -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue FinishedSpan
envQ FinishedSpan
        Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
full (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
        Builder -> IO ()
envLog Builder
"Queue at capacity, span was dropped\n"

consumer :: BatchOptions -> TBQueue FinishedSpan -> IO (Async ())
consumer :: BatchOptions -> TBQueue FinishedSpan -> IO (Async ())
consumer opt :: BatchOptions
[FinishedSpan] -> IO ()
Builder -> IO ()
_boptBatchSize :: BatchOptions -> Word16
_boptTimeoutSec :: BatchOptions -> Word
_boptReporter :: BatchOptions -> [FinishedSpan] -> IO ()
_boptErrorLog :: BatchOptions -> Builder -> IO ()
_boptQueueSize :: BatchOptions -> Natural
_boptAtCapacity :: BatchOptions -> AtCapacity
_boptBatchSize :: Word16
_boptTimeoutSec :: Word
_boptReporter :: [FinishedSpan] -> IO ()
_boptErrorLog :: Builder -> IO ()
_boptQueueSize :: Natural
_boptAtCapacity :: AtCapacity
..} TBQueue FinishedSpan
q = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> (IO () -> IO ()) -> IO () -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
xs <- IO [FinishedSpan]
    Bool -> [FinishedSpan] -> IO ()
go Bool
False [FinishedSpan]
    popBlocking :: IO [FinishedSpan]
popBlocking = STM [FinishedSpan] -> IO [FinishedSpan]
forall a. STM a -> IO a
atomically (STM [FinishedSpan] -> IO [FinishedSpan])
-> STM [FinishedSpan] -> IO [FinishedSpan]
forall a b. (a -> b) -> a -> b
$ do
x <- TBQueue FinishedSpan -> STM FinishedSpan
forall a. TBQueue a -> STM a
readTBQueue TBQueue FinishedSpan
xFinishedSpan -> [FinishedSpan] -> [FinishedSpan]
forall a. a -> [a] -> [a]
:) ([FinishedSpan] -> [FinishedSpan])
-> STM [FinishedSpan] -> STM [FinishedSpan]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Word16 -> TBQueue FinishedSpan -> STM [FinishedSpan]
forall a. Word16 -> TBQueue a -> STM [a]
pop (Word16
_boptBatchSize Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Word16
1) TBQueue FinishedSpan

    popNonblock :: IO [FinishedSpan]
popNonblock = STM [FinishedSpan] -> IO [FinishedSpan]
forall a. STM a -> IO a
atomically (STM [FinishedSpan] -> IO [FinishedSpan])
-> STM [FinishedSpan] -> IO [FinishedSpan]
forall a b. (a -> b) -> a -> b
$ Word16 -> TBQueue FinishedSpan -> STM [FinishedSpan]
forall a. Word16 -> TBQueue a -> STM [a]
pop Word16
_boptBatchSize TBQueue FinishedSpan

    go :: Bool -> [FinishedSpan] -> IO ()
go Bool
_     []    = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    go Bool
True  [FinishedSpan]
batch = [FinishedSpan] -> IO ()
report [FinishedSpan]
batch IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
    go Bool
False [FinishedSpan]
batch = IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync ([FinishedSpan] -> IO ()
report [FinishedSpan]
batch) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
a ->
        Async () -> IO ()
forall a. Async a -> IO ()
timedWait Async ()
a IO () -> (AsyncException -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catchAsync` \case
ThreadKilled -> do
                BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
                Async () -> IO ()
forall a. Async a -> IO ()
timedWait Async ()
a IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async ()
                IO ()
                AsyncException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throwM AsyncException

e -> BatchOptions -> Err -> IO ()
logErr BatchOptions
opt (AsyncException -> Err
ErrReporterAsyncException AsyncException
e) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> AsyncException -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throwM AsyncException

    report :: [FinishedSpan] -> IO ()
report [FinishedSpan]
batch = [FinishedSpan] -> IO ()
_boptReporter [FinishedSpan]
batch IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> (SomeException -> m a) -> m a
        (BatchOptions -> Err -> IO ()
logErr BatchOptions
opt (Err -> IO ()) -> (SomeException -> Err) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Err

    timedWait :: Async a -> IO ()
timedWait Async a
a = Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
timeoutMicros (Async a -> IO a
forall a. Async a -> IO a
wait Async a
a) IO (Maybe a) -> (Maybe a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe a
Nothing -> BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
        Maybe a
_       -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    drain :: IO ()
drain = do
        BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
        IO [FinishedSpan]
popNonblock IO [FinishedSpan] -> ([FinishedSpan] -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> [FinishedSpan] -> IO ()
go Bool

    timeoutMicros :: Int
timeoutMicros = forall a b. (AsMicros a, Integral b) => a -> b
micros @NominalDiffTime (NominalDiffTime -> Int) -> NominalDiffTime -> Int
forall a b. (a -> b) -> a -> b
$ Word -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word

pop :: Word16 -> TBQueue a -> STM [a]
pop :: forall a. Word16 -> TBQueue a -> STM [a]
pop Word16
0 TBQueue a
_ = [a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
pop Word16
n TBQueue a
q = do
    Maybe a
v <- TBQueue a -> STM (Maybe a)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue a
    case Maybe a
v of
        Maybe a
Nothing -> [a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        Just a
v' -> (a
v' a -> [a] -> [a]
forall a. a -> [a] -> [a]
:) ([a] -> [a]) -> STM [a] -> STM [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Word16 -> TBQueue a -> STM [a]
forall a. Word16 -> TBQueue a -> STM [a]
pop (Word16
nWord16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
1) TBQueue a

data Err
    = ErrReporterException      SomeException
    | ErrReporterTimeout
    | ErrReporterCancelled
    | ErrReporterAsyncException AsyncException
    | ErrReporterDraining

logErr :: BatchOptions -> Err -> IO ()
logErr :: BatchOptions -> Err -> IO ()
logErr BatchOptions{_boptErrorLog :: BatchOptions -> Builder -> IO ()
_boptErrorLog=Builder -> IO ()
errlog} Err
e = Builder -> IO ()
errlog (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Err -> Builder
msg Err
e Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Builder
    sbs :: ShortByteString -> Builder
sbs = ShortByteString -> Builder

    ebs :: Exception e => e -> Builder
    ebs :: forall e. Exception e => e -> Builder
ebs = String -> Builder
string8 (String -> Builder) -> (e -> String) -> e -> Builder
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> String
forall a. Show a => a -> String

    msg :: Err -> Builder
msg = \case
        ErrReporterException      SomeException
ex -> ShortByteString -> Builder
sbs ShortByteString
"Reporter Error: " Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Builder
forall e. Exception e => e -> Builder
ebs SomeException
ErrReporterTimeout           -> ShortByteString -> Builder
sbs ShortByteString
"Reporter timed out!"
ErrReporterCancelled         -> ShortByteString -> Builder
sbs ShortByteString
"Batch reporter cancelled, shutting down gracefully"
        ErrReporterAsyncException AsyncException
ex -> ShortByteString -> Builder
sbs ShortByteString
"Batch reporter received async exception: " Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> AsyncException -> Builder
forall e. Exception e => e -> Builder
ebs AsyncException
ErrReporterDraining          -> ShortByteString -> Builder
sbs ShortByteString
"Draining batch reporter queue"

    nl :: Builder
nl = Char -> Builder
char8 Char