{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Monitor.Tracing.Zipkin (
Settings(..), defaultSettings,
Endpoint(..), defaultEndpoint,
Zipkin,
new, run, publish, with,
B3(..), b3ToHeaders, b3FromHeaders, b3ToHeaderValue, b3FromHeaderValue,
clientSpan, clientSpanWith, serverSpan, serverSpanWith, producerSpanWith, consumerSpanWith,
tag, addTag, addInheritedTag,
annotate, annotateAt,
addEndpoint
) where
import Control.Monad.Trace
import Control.Monad.Trace.Class
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (atomically, tryReadTChan)
import Control.Exception.Lifted (finally)
import Control.Monad (forever, guard, void, when)
import Control.Monad.Fix (fix)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import qualified Data.Aeson as JSON
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.CaseInsensitive (CI)
import Data.Time.Clock (NominalDiffTime)
import Data.Foldable (toList)
import Data.Int (Int64)
import Data.IORef (modifyIORef, newIORef, readIORef)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, fromMaybe, listToMaybe, maybeToList)
import Data.Monoid (Endo(..))
#if !MIN_VERSION_base(4, 11, 0)
import Data.Semigroup ((<>))
#endif
import Data.Set (Set)
import Data.String (IsString(..))
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time.Clock.POSIX (POSIXTime)
import Network.HTTP.Client (Manager, Request)
import qualified Network.HTTP.Client as HTTP
import Network.Socket (HostName, PortNumber)
data Settings = Settings
{ settingsHostname :: !(Maybe HostName)
, settingsPort :: !(Maybe PortNumber)
, settingsEndpoint :: !(Maybe Endpoint)
, settingsManager :: !(Maybe Manager)
, settingsPublishPeriod :: !(Maybe NominalDiffTime)
}
defaultSettings :: Settings
defaultSettings = Settings Nothing Nothing Nothing Nothing Nothing
instance IsString Settings where
fromString s = defaultSettings { settingsHostname = Just s }
data Zipkin = Zipkin
{ zipkinManager :: !Manager
, zipkinRequest :: !Request
, zipkinTracer :: !Tracer
, zipkinEndpoint :: !(Maybe Endpoint)
}
flushSpans :: Maybe Endpoint -> Tracer -> Request -> Manager -> IO ()
flushSpans ept tracer req mgr = do
ref <- newIORef []
fix $ \loop -> atomically (tryReadTChan $ spanSamples tracer) >>= \case
Nothing -> pure ()
Just sample -> modifyIORef ref (ZipkinSpan ept sample:) >> loop
spns <- readIORef ref
when (not $ null spns) $ do
let req' = req { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns }
void $ HTTP.httpLbs req' mgr
new :: MonadIO m => Settings -> m Zipkin
new (Settings mbHostname mbPort mbEpt mbMgr mbPrd) = liftIO $ do
mgr <- maybe (HTTP.newManager HTTP.defaultManagerSettings) pure mbMgr
tracer <- newTracer
let
req = HTTP.defaultRequest
{ HTTP.method = "POST"
, HTTP.host = BS.pack (fromMaybe "localhost" mbHostname)
, HTTP.requestHeaders = [("Content-Type", "application/json")]
, HTTP.path = "/api/v2/spans"
, HTTP.port = maybe 9411 fromIntegral mbPort }
void $ let prd = fromMaybe 0 mbPrd in if prd <= 0
then pure Nothing
else fmap Just $ forkIO $ forever $ do
threadDelay (microSeconds prd)
flushSpans mbEpt tracer req mgr
pure $ Zipkin mgr req tracer mbEpt
run :: TraceT m a -> Zipkin -> m a
run actn zipkin = runTraceT actn (zipkinTracer zipkin)
publish :: MonadIO m => Zipkin -> m ()
publish z =
liftIO $ flushSpans (zipkinEndpoint z) (zipkinTracer z) (zipkinRequest z) (zipkinManager z)
with :: (MonadIO m, MonadBaseControl IO m)
=> Settings -> (Zipkin -> m a) -> m a
with settings f = do
zipkin <- new settings
f zipkin `finally` publish zipkin
tag :: MonadTrace m => Text -> Text -> m ()
tag key val = addSpanEntry (publicKeyPrefix <> key) (tagTextValue val)
addTag :: Text -> Text -> Builder -> Builder
addTag key val bldr = bldr { builderTags = Map.insert key (JSON.toJSON val) (builderTags bldr) }
addInheritedTag :: Text -> Text -> Builder -> Builder
addInheritedTag key val bldr =
let bgs = builderBaggages bldr
in bldr { builderBaggages = Map.insert key (T.encodeUtf8 val) bgs }
annotate :: MonadTrace m => Text -> m ()
annotate val = addSpanEntry "" (logValue val)
annotateAt :: MonadTrace m => POSIXTime -> Text -> m ()
annotateAt time val = addSpanEntry "" (logValueAt time val)
data B3 = B3
{ b3TraceID :: !TraceID
, b3SpanID :: !SpanID
, b3IsSampled :: !Bool
, b3IsDebug :: !Bool
, b3ParentSpanID :: !(Maybe SpanID)
} deriving (Eq, Ord, Show)
traceIDHeader, spanIDHeader, parentSpanIDHeader, sampledHeader, debugHeader :: CI ByteString
traceIDHeader = "X-B3-TraceId"
spanIDHeader = "X-B3-SpanId"
parentSpanIDHeader = "X-B3-ParentSpanId"
sampledHeader = "X-B3-Sampled"
debugHeader = "X-B3-Flags"
b3ToHeaders :: B3 -> Map (CI ByteString) ByteString
b3ToHeaders (B3 traceID spanID isSampled isDebug mbParentID) =
let
defaultKVs = [(traceIDHeader, encodeTraceID traceID), (spanIDHeader, encodeSpanID spanID)]
parentKVs = (parentSpanIDHeader,) . encodeSpanID <$> maybeToList mbParentID
sampledKVs = case (isSampled, isDebug) of
(_, True) -> [(debugHeader, "1")]
(True, _) -> [(sampledHeader, "1")]
(False, _) -> [(sampledHeader, "0")]
in fmap T.encodeUtf8 $ Map.fromList $ defaultKVs ++ parentKVs ++ sampledKVs
b3FromHeaders :: Map (CI ByteString) ByteString -> Maybe B3
b3FromHeaders hdrs = do
let
find key = T.decodeUtf8 <$> Map.lookup key hdrs
findBool def key = case find key of
Nothing -> Just def
Just "1" -> Just True
Just "0" -> Just False
_ -> Nothing
dbg <- findBool False debugHeader
sampled <- findBool dbg sampledHeader
guard (not $ sampled == False && dbg)
B3
<$> (find traceIDHeader >>= decodeTraceID)
<*> (find spanIDHeader >>= decodeSpanID)
<*> pure sampled
<*> pure dbg
<*> maybe (pure Nothing) (Just <$> decodeSpanID) (find parentSpanIDHeader)
b3ToHeaderValue :: B3 -> ByteString
b3ToHeaderValue (B3 traceID spanID isSampled isDebug mbParentID) =
let
state = case (isSampled, isDebug) of
(_ , True) -> "d"
(True, _) -> "1"
(False, _) -> "0"
required = [encodeTraceID traceID, encodeSpanID spanID, state]
optional = encodeSpanID <$> maybeToList mbParentID
in BS.intercalate "-" $ fmap T.encodeUtf8 $ required ++ optional
b3FromHeaderValue :: ByteString -> Maybe B3
b3FromHeaderValue bs = case T.splitOn "-" $ T.decodeUtf8 bs of
(traceIDstr:spanIDstr:strs) -> do
traceID <- decodeTraceID traceIDstr
spanID <- decodeSpanID spanIDstr
let buildB3 = B3 traceID spanID
case strs of
[] -> pure $ buildB3 False False Nothing
(state:strs') -> do
buildB3' <- case state of
"0" -> pure $ buildB3 False False
"1" -> pure $ buildB3 True False
"d" -> pure $ buildB3 True True
_ -> Nothing
case strs' of
[] -> pure $ buildB3' Nothing
[str] -> buildB3' . Just <$> decodeSpanID str
_ -> Nothing
_ -> Nothing
b3FromSpan :: Span -> B3
b3FromSpan s =
let
ctx = spanContext s
refs = spanReferences s
in B3 (contextTraceID ctx) (contextSpanID ctx) (spanIsSampled s) (spanIsDebug s) (parentID refs)
insertTag :: JSON.ToJSON a => Key -> a -> Endo Builder
insertTag key val =
Endo $ \bldr -> bldr { builderTags = Map.insert key (JSON.toJSON val) (builderTags bldr) }
importB3 :: B3 -> Endo Builder
importB3 b3 =
let
policy = if b3IsDebug b3
then debugEnabled
else sampledWhen $ b3IsSampled b3
in Endo $ \bldr -> bldr
{ builderTraceID = Just (b3TraceID b3)
, builderSpanID = Just (b3SpanID b3)
, builderSamplingPolicy = Just policy }
publicKeyPrefix :: Text
publicKeyPrefix = "Z."
endpointKey :: Key
endpointKey = "z.e"
kindKey :: Key
kindKey = "z.k"
outgoingSpan :: MonadTrace m => Text -> Endo Builder -> Name -> (Maybe B3 -> m a) -> m a
outgoingSpan kind endo name f = childSpanWith (appEndo endo') name actn where
endo' = insertTag kindKey kind <> endo
actn = activeSpan >>= \case
Nothing -> f Nothing
Just spn -> f $ Just $ b3FromSpan spn
clientSpan :: MonadTrace m => Name -> (Maybe B3 -> m a) -> m a
clientSpan = clientSpanWith id
clientSpanWith :: MonadTrace m => (Builder -> Builder) -> Name -> (Maybe B3 -> m a) -> m a
clientSpanWith f = outgoingSpan "CLIENT" (Endo f)
producerSpanWith :: MonadTrace m => (Builder -> Builder) -> Name -> (Maybe B3 -> m a) -> m a
producerSpanWith f = outgoingSpan "PRODUCER" (Endo f)
incomingSpan :: MonadTrace m => Text -> Endo Builder -> B3 -> m a -> m a
incomingSpan kind endo b3 actn =
let bldr = appEndo (importB3 b3 <> insertTag kindKey kind <> endo) $ builder ""
in trace bldr actn
serverSpan :: MonadTrace m => B3 -> m a -> m a
serverSpan = serverSpanWith id
serverSpanWith :: MonadTrace m => (Builder -> Builder) -> B3 -> m a -> m a
serverSpanWith f = incomingSpan "SERVER" (Endo f)
consumerSpanWith :: MonadTrace m => (Builder -> Builder) -> B3 -> m a -> m a
consumerSpanWith f = incomingSpan "CONSUMER" (Endo f)
data Endpoint = Endpoint
{ endpointService :: !(Maybe Text)
, endpointPort :: !(Maybe Int)
, endpointIPv4 :: !(Maybe Text)
, endpointIPv6 :: !(Maybe Text)
} deriving (Eq, Ord, Show)
defaultEndpoint :: Endpoint
defaultEndpoint = Endpoint Nothing Nothing Nothing Nothing
addEndpoint :: Endpoint -> Builder -> Builder
addEndpoint = appEndo . insertTag endpointKey
instance IsString Endpoint where
fromString s = defaultEndpoint { endpointService = Just (T.pack s) }
instance JSON.ToJSON Endpoint where
toJSON (Endpoint mbSvc mbPort mbIPv4 mbIPv6) = JSON.object $ catMaybes
[ ("serviceName" JSON..=) <$> mbSvc
, ("port" JSON..=) <$> mbPort
, ("ipv4" JSON..=) <$> mbIPv4
, ("ipv6" JSON..=) <$> mbIPv6 ]
parentID :: Set Reference -> Maybe SpanID
parentID = listToMaybe . catMaybes . fmap go . toList where
go (ChildOf d) = Just d
go _ = Nothing
data ZipkinAnnotation = ZipkinAnnotation !POSIXTime !JSON.Value
instance JSON.ToJSON ZipkinAnnotation where
toJSON (ZipkinAnnotation t v) = JSON.object
[ "timestamp" JSON..= microSeconds @Int64 t
, "value" JSON..= v ]
data ZipkinSpan = ZipkinSpan !(Maybe Endpoint) !Sample
publicTags :: Tags -> Map Text JSON.Value
publicTags = Map.fromList . catMaybes . fmap go . Map.assocs where
go (k, v) = case T.stripPrefix publicKeyPrefix k of
Nothing -> Nothing
Just k' -> Just (k', v)
instance JSON.ToJSON ZipkinSpan where
toJSON (ZipkinSpan mbEpt (Sample spn tags logs start duration)) =
let
ctx = spanContext spn
requiredKVs =
[ "traceId" JSON..= contextTraceID ctx
, "name" JSON..= spanName spn
, "id" JSON..= contextSpanID ctx
, "timestamp" JSON..= microSeconds @Int64 start
, "duration" JSON..= microSeconds @Int64 duration
, "debug" JSON..= spanIsDebug spn
, "tags" JSON..= (publicTags tags <> (JSON.toJSON . T.decodeUtf8 <$> contextBaggages ctx))
, "annotations" JSON..= fmap (\(t, _, v) -> ZipkinAnnotation t v) logs ]
optionalKVs = catMaybes
[ ("parentId" JSON..=) <$> parentID (spanReferences spn)
, ("localEndpoint" JSON..=) <$> mbEpt
, ("remoteEndpoint" JSON..=) <$> Map.lookup endpointKey tags
, ("kind" JSON..=) <$> Map.lookup kindKey tags ]
in JSON.object $ requiredKVs ++ optionalKVs
microSeconds :: Integral a => NominalDiffTime -> a
microSeconds = round . (* 1000000)