{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE CPP #-}

-- | This is a log scribe that writes logs to logz.io's bulk
-- <https://app.logz.io/#/dashboard/data-sources/Bulk-HTTPS HTTPS
-- API>.
module Katip.Scribes.LogzIO.HTTPS
  ( -- * Scribe construction
    mkLogzIOScribe,

    -- * Types
    BulkAPIError (..),
    LogzIOScribeConfiguration (..),
    Scheme (..),
    APIToken (..),
    LoggingError (..),

    -- ** Presets for configuration
    usRegionHost,
    euRegionHost,
    httpsPort,
    httpPort,
    defaultRetryPolicy,
    defaultLogzIOScribeConfiguration,

    -- * Internal API exported for testing
    renderLineTruncated,
    renderLineTruncated',
    maxPayloadBytes,
    maxLogLineLength,
    BulkBuffer (..),
    LogAction (..),
    bufferItem,
    bufferItem',
    forceFlush,
    Bytes (..),
  )
where

-------------------------------------------------------------------------------
import Control.Applicative
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TBMQueue as STM
import qualified Control.Error as E
import qualified Control.Exception.Safe as EX
import Control.Monad
import qualified Control.Retry as Retry
import qualified Data.Aeson as A
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
#if MIN_VERSION_aeson (2, 0, 0)
import qualified Data.Aeson.Key as A
import qualified Data.Aeson.KeyMap as A
#else
import qualified Data.HashMap.Strict as HM
#endif
import Data.Int
import qualified Data.Scientific as Scientific
import Data.Semigroup as Semigroup
import Data.String (IsString)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Builder as TB
import qualified Data.Time as Time
import qualified Katip as K
import Katip.Core (LocJs (..))
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTPS
import qualified Network.HTTP.Types as HTypes
import qualified System.Posix.Types as POSIX
import qualified URI.ByteString as URIBS

-------------------------------------------------------------------------------

-- | This is returned when the bulk import was a partial success
data BulkAPIError = BulkAPIError
  { -- | The number of log lines which are not well-formed JSON. This
    -- indicates a __library bug__. Please file an issue on GitHub.
    BulkAPIError -> Int
bulkAPIError_malformedLines :: Int,
    -- | The number of log lines received successfully.
    BulkAPIError -> Int
bulkAPIError_successfulLines :: Int,
    -- | The number of log lines which exceed the line length
    -- limit. katip-logzio makes a best effort to truncate logs that
    -- exceed the size limit. This probably indicates a __library bug__
    -- and should be reported as an issue on GitHub.
    BulkAPIError -> Int
bulkAPIError_oversizedLines :: Int,
    -- | The number of log lines which were empty. There isnt' really a
    -- concept of a truly empty log line in katip, so this most likely
    -- indicates a __library bug__ and should be reported as an issue on
    -- GitHub.
    BulkAPIError -> Int
bulkAPIError_emptyLogLines :: Int
  }
  deriving (Int -> BulkAPIError -> ShowS
[BulkAPIError] -> ShowS
BulkAPIError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BulkAPIError] -> ShowS
$cshowList :: [BulkAPIError] -> ShowS
show :: BulkAPIError -> String
$cshow :: BulkAPIError -> String
showsPrec :: Int -> BulkAPIError -> ShowS
$cshowsPrec :: Int -> BulkAPIError -> ShowS
Show, BulkAPIError -> BulkAPIError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BulkAPIError -> BulkAPIError -> Bool
$c/= :: BulkAPIError -> BulkAPIError -> Bool
== :: BulkAPIError -> BulkAPIError -> Bool
$c== :: BulkAPIError -> BulkAPIError -> Bool
Eq)

instance A.FromJSON BulkAPIError where
  parseJSON :: Value -> Parser BulkAPIError
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
A.withObject String
"BulkAPIError" forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    Int
malformedLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"malformedLines"
    Int
successfulLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"successfulLines"
    Int
oversizedLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"oversizedLines"
    Int
emptyLogLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"emptyLogLines"
    forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
      BulkAPIError
        { bulkAPIError_malformedLines :: Int
bulkAPIError_malformedLines = Int
malformedLines,
          bulkAPIError_successfulLines :: Int
bulkAPIError_successfulLines = Int
successfulLines,
          bulkAPIError_oversizedLines :: Int
bulkAPIError_oversizedLines = Int
oversizedLines,
          bulkAPIError_emptyLogLines :: Int
bulkAPIError_emptyLogLines = Int
emptyLogLines
        }

-------------------------------------------------------------------------------
data LogzIOScribeConfiguration = LogzIOScribeConfiguration
  { -- | Will flush the log buffer if this many items is in the buffer
    -- __or__ 'logzIOScribeConfiguration_bufferTimeout' is reached,
    -- whichever is first
    LogzIOScribeConfiguration -> Int
logzIOScribeConfiguration_bufferItems :: Int,
    -- | Will flush the buffer if it has been this long since the last
    -- flush __or__ 'logzIOScribeConfiguration_bufferItems' items are
    -- accumulated, whichever is first. NominalDiffTime has a Num
    -- instance, so you can use a literal to specify seconds, e.g. 30 =
    -- 30s.
    LogzIOScribeConfiguration -> NominalDiffTime
logzIOScribeConfiguration_bufferTimeout :: Time.NominalDiffTime,
    LogzIOScribeConfiguration -> Scheme
logzIOScribeConfiguration_scheme :: Scheme,
    LogzIOScribeConfiguration -> Host
logzIOScribeConfiguration_host :: URIBS.Host,
    LogzIOScribeConfiguration -> Port
logzIOScribeConfiguration_port :: URIBS.Port,
    LogzIOScribeConfiguration -> APIToken
logzIOScribeConfiguration_token :: APIToken,
    -- | How should exceptions during writes be retried?
    LogzIOScribeConfiguration -> RetryPolicyM IO
logzIOScribeConfiguration_retry :: Retry.RetryPolicyM IO,
    LogzIOScribeConfiguration -> LoggingError -> IO ()
logzIOScribeConfiguration_onError :: LoggingError -> IO ()
  }

-- | A default configuration:
--
--    * 100 item buffering
--
--    * 30 second buffer timeout
--
--    * US, HTTPS logging endpoint (listener.logz.io:8071)
--
--    * 'defaultRetryPolicy' of 25ms exponential backoff, 5 retries
--
--    * Ignore logging errors
defaultLogzIOScribeConfiguration :: APIToken -> LogzIOScribeConfiguration
defaultLogzIOScribeConfiguration :: APIToken -> LogzIOScribeConfiguration
defaultLogzIOScribeConfiguration APIToken
token =
  LogzIOScribeConfiguration
    { logzIOScribeConfiguration_bufferItems :: Int
logzIOScribeConfiguration_bufferItems = Int
100,
      logzIOScribeConfiguration_bufferTimeout :: NominalDiffTime
logzIOScribeConfiguration_bufferTimeout = NominalDiffTime
30,
      logzIOScribeConfiguration_scheme :: Scheme
logzIOScribeConfiguration_scheme = Scheme
HTTPS,
      logzIOScribeConfiguration_host :: Host
logzIOScribeConfiguration_host = Host
usRegionHost,
      logzIOScribeConfiguration_port :: Port
logzIOScribeConfiguration_port = Port
httpsPort,
      logzIOScribeConfiguration_token :: APIToken
logzIOScribeConfiguration_token = APIToken
token,
      logzIOScribeConfiguration_retry :: RetryPolicyM IO
logzIOScribeConfiguration_retry = forall (m :: * -> *). Monad m => RetryPolicyM m
defaultRetryPolicy,
      logzIOScribeConfiguration_onError :: LoggingError -> IO ()
logzIOScribeConfiguration_onError = forall a b. a -> b -> a
const (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    }

-------------------------------------------------------------------------------

-- | You can retrieve your account or sub-account's API token on the
-- <https://app.logz.io/#/dashboard/settings/manage-accounts manage
-- accounts page>. Note that APIToken has an IsString instance,
-- meaning that you can use a string literal with OverloadedStrings
-- enabled.
newtype APIToken = APIToken
  { APIToken -> Text
apiToken :: T.Text
  }
  deriving (Int -> APIToken -> ShowS
[APIToken] -> ShowS
APIToken -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [APIToken] -> ShowS
$cshowList :: [APIToken] -> ShowS
show :: APIToken -> String
$cshow :: APIToken -> String
showsPrec :: Int -> APIToken -> ShowS
$cshowsPrec :: Int -> APIToken -> ShowS
Show, APIToken -> APIToken -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: APIToken -> APIToken -> Bool
$c/= :: APIToken -> APIToken -> Bool
== :: APIToken -> APIToken -> Bool
$c== :: APIToken -> APIToken -> Bool
Eq, String -> APIToken
forall a. (String -> a) -> IsString a
fromString :: String -> APIToken
$cfromString :: String -> APIToken
IsString)

-- | This particular bulk API only supports HTTP or HTTPS. HTTPS is
-- strongly recommended for security reasons.
data Scheme
  = -- | HTTPs should always be used except for local testing.
    HTTPS
  | HTTP
  deriving (Int -> Scheme -> ShowS
[Scheme] -> ShowS
Scheme -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Scheme] -> ShowS
$cshowList :: [Scheme] -> ShowS
show :: Scheme -> String
$cshow :: Scheme -> String
showsPrec :: Int -> Scheme -> ShowS
$cshowsPrec :: Int -> Scheme -> ShowS
Show, Scheme -> Scheme -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Scheme -> Scheme -> Bool
$c/= :: Scheme -> Scheme -> Bool
== :: Scheme -> Scheme -> Bool
$c== :: Scheme -> Scheme -> Bool
Eq)

-- | See
-- <https://support.logz.io/hc/en-us/articles/210206365-What-IP-addresses-should-I-open-in-my-firewall-to-ship-logs-to-Logz-io-
-- this> for a list of listeners. This is the US region host,
-- listener.logz.io
usRegionHost :: URIBS.Host
usRegionHost :: Host
usRegionHost = ByteString -> Host
URIBS.Host ByteString
"listener.logz.io"

-- | See
-- <https://support.logz.io/hc/en-us/articles/210206365-What-IP-addresses-should-I-open-in-my-firewall-to-ship-logs-to-Logz-io-
-- this> for a list of listeners. This is the EU region host,
-- listener.logz.io
euRegionHost :: URIBS.Host
euRegionHost :: Host
euRegionHost = ByteString -> Host
URIBS.Host ByteString
"listener-eu.logz.io"

-- | Logz.io uses port 8071 for HTTPS
httpsPort :: URIBS.Port
httpsPort :: Port
httpsPort = Int -> Port
URIBS.Port Int
8071

-- | Logz.io uses port 8070 for HTTP
httpPort :: URIBS.Port
httpPort :: Port
httpPort = Int -> Port
URIBS.Port Int
8070

-- | A reasonable retry policy: exponential backoff with 25ms base
-- delay up to 5 retries, for a total cumulative delay of 775ms.
defaultRetryPolicy :: (Monad m) => Retry.RetryPolicyM m
defaultRetryPolicy :: forall (m :: * -> *). Monad m => RetryPolicyM m
defaultRetryPolicy = forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
Retry.exponentialBackoff Int
25000 forall a. Monoid a => a -> a -> a
`mappend` Int -> forall (m :: * -> *). Monad m => RetryPolicyM m
Retry.limitRetries Int
5

-------------------------------------------------------------------------------
mkLogzIOScribe ::
  LogzIOScribeConfiguration ->
  K.PermitFunc ->
  K.Verbosity ->
  IO K.Scribe
mkLogzIOScribe :: LogzIOScribeConfiguration -> PermitFunc -> Verbosity -> IO Scribe
mkLogzIOScribe LogzIOScribeConfiguration
config PermitFunc
permitItem Verbosity
verbosity = do
  -- This size is actually somewhat arbitrary. We're just making sure
  -- it isn't infinite so we don't consume the whole backlog right
  -- away and take up lots of memory.. Katip above us does bounded
  -- buffering and load-shedding when there's too much of a
  -- backlog. Our writes are blocking to apply backpressure up to
  -- katip. This value just holds onto values temporarily while we
  -- concatenate them into a buffer.
  TBMQueue AnyLogItem
ingestionQueue <- forall a. Int -> IO (TBMQueue a)
STM.newTBMQueueIO (Int
itemBufferSize forall a. Num a => a -> a -> a
* Int
10)

  let newTimer :: IO (TVar Bool)
newTimer = Int -> IO (TVar Bool)
STM.registerDelay Int
itemBufferTimeoutMicros
  TVar (TVar Bool)
timerRef <- forall a. a -> IO (TVar a)
STM.newTVarIO forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (TVar Bool)
newTimer

  -- Set up a connection manager for requests
  Manager
mgr <- case Scheme
scheme of
    Scheme
HTTPS -> forall (m :: * -> *). MonadIO m => m Manager
HTTPS.newTlsManager
    Scheme
HTTP -> ManagerSettings -> IO Manager
HTTP.newManager ManagerSettings
HTTP.defaultManagerSettings

  -- An STM transaction that will return true when writes are stopped
  -- and backlog is emptied.
  let workExhausted :: STM.STM Bool
      workExhausted :: STM Bool
workExhausted =
        Bool -> Bool -> Bool
(&&)
          forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TBMQueue a -> STM Bool
STM.isClosedTBMQueue TBMQueue AnyLogItem
ingestionQueue
          forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TBMQueue a -> STM Bool
STM.isEmptyTBMQueue TBMQueue AnyLogItem
ingestionQueue
  -- Block until there's no more work
  let waitWorkExhausted :: STM.STM ()
      waitWorkExhausted :: STM ()
waitWorkExhausted = Bool -> STM ()
STM.check forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM Bool
workExhausted
  let pop :: STM.STM (Tick AnyLogItem)
      pop :: STM (Tick AnyLogItem)
pop = forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. Tick a
WorkExhausted forall a. a -> Tick a
NewItem forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TBMQueue a -> STM (Maybe a)
STM.readTBMQueue TBMQueue AnyLogItem
ingestionQueue

  let timeExpired :: STM.STM (Tick a)
      timeExpired :: forall a. STM (Tick a)
timeExpired = do
        Bool
isExpired <- forall a. TVar a -> STM a
STM.readTVar forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
STM.readTVar TVar (TVar Bool)
timerRef
        Bool -> STM ()
STM.check Bool
isExpired
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Tick a
TimeExpired

  -- Circular transaction that checks for completion of work, time
  -- expiration, or new events.
  let nextTick :: STM.STM (Tick AnyLogItem)
      nextTick :: STM (Tick AnyLogItem)
nextTick =
        forall a. STM (Tick a)
timeExpired
          forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (Tick AnyLogItem)
pop
          forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall a. Tick a
WorkExhausted forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ STM ()
waitWorkExhausted)

  -- Blocking push. This applies backpressure upstream to katip, which does its own buffering
  let push :: AnyLogItem -> STM.STM ()
      push :: AnyLogItem -> STM ()
push = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> a -> STM ()
STM.writeTBMQueue TBMQueue AnyLogItem
ingestionQueue

  let sealQueue :: IO ()
sealQueue = forall a. STM a -> IO a
STM.atomically (forall a. TBMQueue a -> STM ()
STM.closeTBMQueue TBMQueue AnyLogItem
ingestionQueue)

  -- Replace the timer with a new one
  let resetTimer :: IO ()
resetTimer = forall a. STM a -> IO a
STM.atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (TVar Bool)
timerRef forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (TVar Bool)
newTimer

  -- Send the buffer and then reset the timer
  let flush :: BulkBuffer -> IO ()
flush BulkBuffer
curBuffer = do
        Either LoggingError ()
res <- LogzIOScribeConfiguration
-> Manager -> BulkBuffer -> IO (Either LoggingError ())
flushBuffer LogzIOScribeConfiguration
config Manager
mgr BulkBuffer
curBuffer
        case Either LoggingError ()
res of
          Left LoggingError
e -> LoggingError -> IO ()
onErrorSafe LoggingError
e
          Right () -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        IO ()
resetTimer

  let flushLoop :: BulkBuffer -> IO ()
      flushLoop :: BulkBuffer -> IO ()
flushLoop BulkBuffer
curBuffer = do
        Tick AnyLogItem
tick <- forall a. STM a -> IO a
STM.atomically STM (Tick AnyLogItem)
nextTick
        case Tick AnyLogItem
tick of
          Tick AnyLogItem
WorkExhausted -> do
            BulkBuffer -> IO ()
flush BulkBuffer
curBuffer -- flush what you've got
            forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- stop looping
          Tick AnyLogItem
TimeExpired -> do
            BulkBuffer -> IO ()
flush BulkBuffer
curBuffer
            BulkBuffer -> IO ()
flushLoop forall a. Monoid a => a
mempty
          NewItem (AnyLogItem Item a
item) -> do
            case forall a.
LogItem a =>
Int -> Verbosity -> Item a -> BulkBuffer -> LogAction BulkBuffer
bufferItem (LogzIOScribeConfiguration -> Int
logzIOScribeConfiguration_bufferItems LogzIOScribeConfiguration
config) Verbosity
verbosity Item a
item BulkBuffer
curBuffer of
              Buffered BulkBuffer
newBuffer -> BulkBuffer -> IO ()
flushLoop BulkBuffer
newBuffer
              FlushNow BulkBuffer
flushThis BulkBuffer
newBuffer -> do
                BulkBuffer -> IO ()
flush BulkBuffer
flushThis
                BulkBuffer -> IO ()
flushLoop BulkBuffer
newBuffer

  Async ()
flushThread <- forall a. IO a -> IO (Async a)
Async.async (BulkBuffer -> IO ()
flushLoop forall a. Monoid a => a
mempty)

  let close :: IO ()
close = do
        IO ()
sealQueue
        Either SomeException ()
_ <- forall a. Async a -> IO (Either SomeException a)
Async.waitCatch Async ()
flushThread
        forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
    K.Scribe
      { liPush :: forall a. LogItem a => Item a -> IO ()
K.liPush = forall a. STM a -> IO a
STM.atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnyLogItem -> STM ()
push forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. LogItem a => Item a -> AnyLogItem
AnyLogItem,
        scribeFinalizer :: IO ()
K.scribeFinalizer = IO ()
close,
        scribePermitItem :: PermitFunc
K.scribePermitItem = PermitFunc
permitItem
      }
  where
    itemBufferSize :: Int
itemBufferSize = LogzIOScribeConfiguration -> Int
logzIOScribeConfiguration_bufferItems LogzIOScribeConfiguration
config
    itemBufferTimeoutMicros :: Int
itemBufferTimeoutMicros = NominalDiffTime -> Int
ndtToMicros (LogzIOScribeConfiguration -> NominalDiffTime
logzIOScribeConfiguration_bufferTimeout LogzIOScribeConfiguration
config)
    scheme :: Scheme
scheme = LogzIOScribeConfiguration -> Scheme
logzIOScribeConfiguration_scheme LogzIOScribeConfiguration
config
    onErrorSafe :: LoggingError -> IO ()
onErrorSafe LoggingError
ex = do
      Either SomeException ()
_ <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
EX.tryAny (LogzIOScribeConfiguration -> LoggingError -> IO ()
logzIOScribeConfiguration_onError LogzIOScribeConfiguration
config LoggingError
ex)
      forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

data AnyLogItem where
  AnyLogItem :: K.LogItem a => K.Item a -> AnyLogItem

-------------------------------------------------------------------------------
data Tick a
  = TimeExpired
  | NewItem !a
  | WorkExhausted

-------------------------------------------------------------------------------
-- Match the native resolution of NominalDiffTime, which is excessive
-- for our needs
ndtPicos :: Time.NominalDiffTime -> Int64
ndtPicos :: NominalDiffTime -> Int64
ndtPicos = forall a b. (RealFrac a, Integral b) => a -> b
round forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Num a => a -> a -> a
* NominalDiffTime
picos)
  where
    picos :: Time.NominalDiffTime
    picos :: NominalDiffTime
picos = NominalDiffTime
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
9 :: Int)

-------------------------------------------------------------------------------
ndtToMicros :: Time.NominalDiffTime -> Int
ndtToMicros :: NominalDiffTime -> Int
ndtToMicros NominalDiffTime
t = forall a b. (RealFrac a, Integral b) => a -> b
round ((forall a b. (Integral a, Num b) => a -> b
fromIntegral (NominalDiffTime -> Int64
ndtPicos NominalDiffTime
t) :: Double) forall a. Fractional a => a -> a -> a
/ Double
picosInMicro)
  where
    picosInMicro :: Double
picosInMicro = Double
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
3 :: Int)

-------------------------------------------------------------------------------
data LoggingError
  = -- | The URI generated was invalid. Check your configuration
    URIError HTTP.HttpException
  | -- | We encountered an exception while sending the batch request
    RequestError HTTP.HttpException
  | -- | Some or all of the request was rejected. Check the logz.io UI
    -- for indexing errors.
    PartialFailure BulkAPIError
  | -- | Your API token was rejected.
    BadToken
  | -- | An error returned, but it could not be decoded into a
    -- 'BulkAPIError'. This may indicate a __library bug__, which should
    -- be reported to the issue tracker.
    UnknownFailureResponse HTypes.Status LBS.ByteString
  deriving (Int -> LoggingError -> ShowS
[LoggingError] -> ShowS
LoggingError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LoggingError] -> ShowS
$cshowList :: [LoggingError] -> ShowS
show :: LoggingError -> String
$cshow :: LoggingError -> String
showsPrec :: Int -> LoggingError -> ShowS
$cshowsPrec :: Int -> LoggingError -> ShowS
Show)

-------------------------------------------------------------------------------
flushBuffer ::
  LogzIOScribeConfiguration ->
  HTTP.Manager ->
  BulkBuffer ->
  IO (Either LoggingError ())
flushBuffer :: LogzIOScribeConfiguration
-> Manager -> BulkBuffer -> IO (Either LoggingError ())
flushBuffer LogzIOScribeConfiguration
config Manager
mgr BulkBuffer
bulkBuffer
  | BulkBuffer -> Int
bulkBuffer_itemCount BulkBuffer
bulkBuffer forall a. Ord a => a -> a -> Bool
<= Int
0 = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
  | Bool
otherwise = do
    forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
E.runExceptT forall a b. (a -> b) -> a -> b
$ do
      Request
req <- forall (m :: * -> *) a b r.
Functor m =>
(a -> b) -> ExceptT a m r -> ExceptT b m r
E.fmapLT HttpException -> LoggingError
URIError (forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
E.ExceptT (forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
EX.try (Request -> Request
configureRequest forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *). MonadThrow m => String -> m Request
HTTP.parseRequest String
uriStr)))
      Response ByteString
resp <- forall (m :: * -> *) a b r.
Functor m =>
(a -> b) -> ExceptT a m r -> ExceptT b m r
E.fmapLT HttpException -> LoggingError
RequestError forall a b. (a -> b) -> a -> b
$
        forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
E.ExceptT forall a b. (a -> b) -> a -> b
$
          forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
EX.try forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
Retry.recovering RetryPolicyM IO
retryPolicy [\RetryStatus
_stat -> forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
EX.Handler forall (m :: * -> *). Applicative m => HttpException -> m Bool
handleHttpException] forall a b. (a -> b) -> a -> b
$ \RetryStatus
_stat ->
              Request -> Manager -> IO (Response ByteString)
HTTP.httpLbs Request
req Manager
mgr
      let respLBS :: ByteString
respLBS = forall body. Response body -> body
HTTP.responseBody Response ByteString
resp
      let respStatus :: Status
respStatus = forall body. Response body -> Status
HTTP.responseStatus Response ByteString
resp
      if Status -> Bool
HTypes.statusIsSuccessful Status
respStatus
        then forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        else case forall a. FromJSON a => ByteString -> Maybe a
A.decode @BulkAPIError ByteString
respLBS of
          Maybe BulkAPIError
Nothing
            | Status -> Int
HTypes.statusCode Status
respStatus forall a. Eq a => a -> a -> Bool
== Int
401 -> forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
E.throwE LoggingError
BadToken
            | Bool
otherwise -> forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
E.throwE (Status -> ByteString -> LoggingError
UnknownFailureResponse Status
respStatus ByteString
respLBS)
          Just BulkAPIError
bulkError -> forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
E.throwE (BulkAPIError -> LoggingError
PartialFailure BulkAPIError
bulkError)
  where
    configureRequest :: Request -> Request
configureRequest Request
req =
      Request
req
        { method :: ByteString
HTTP.method = ByteString
HTypes.methodPost,
          requestBody :: RequestBody
HTTP.requestBody = ByteString -> RequestBody
HTTP.RequestBodyLBS (Builder -> ByteString
BB.toLazyByteString (BulkBuffer -> Builder
bulkBuffer_payload BulkBuffer
bulkBuffer))
        }

    retryPolicy :: RetryPolicyM IO
retryPolicy = LogzIOScribeConfiguration -> RetryPolicyM IO
logzIOScribeConfiguration_retry LogzIOScribeConfiguration
config
    apiTokenBS :: ByteString
apiTokenBS = Text -> ByteString
TE.encodeUtf8 (APIToken -> Text
apiToken (LogzIOScribeConfiguration -> APIToken
logzIOScribeConfiguration_token LogzIOScribeConfiguration
config))

    handleHttpException :: (Applicative m) => HTTP.HttpException -> m Bool
    handleHttpException :: forall (m :: * -> *). Applicative m => HttpException -> m Bool
handleHttpException HttpException
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

    uriStr :: String
uriStr = ByteString -> String
LBS8.unpack (Builder -> ByteString
BB.toLazyByteString (forall a. URIRef a -> Builder
URIBS.serializeURIRef URIRef Absolute
uri))

    authority :: Authority
authority =
      URIBS.Authority
        { authorityUserInfo :: Maybe UserInfo
URIBS.authorityUserInfo = forall a. Maybe a
Nothing,
          authorityHost :: Host
URIBS.authorityHost = LogzIOScribeConfiguration -> Host
logzIOScribeConfiguration_host LogzIOScribeConfiguration
config,
          authorityPort :: Maybe Port
URIBS.authorityPort = forall a. a -> Maybe a
Just (LogzIOScribeConfiguration -> Port
logzIOScribeConfiguration_port LogzIOScribeConfiguration
config)
        }

    uri :: URIRef Absolute
uri =
      URIBS.URI
        { uriScheme :: Scheme
URIBS.uriScheme = case LogzIOScribeConfiguration -> Scheme
logzIOScribeConfiguration_scheme LogzIOScribeConfiguration
config of
            Scheme
HTTPS -> ByteString -> Scheme
URIBS.Scheme ByteString
"https"
            Scheme
HTTP -> ByteString -> Scheme
URIBS.Scheme ByteString
"http",
          uriAuthority :: Maybe Authority
URIBS.uriAuthority = forall a. a -> Maybe a
Just Authority
authority,
          uriPath :: ByteString
URIBS.uriPath = ByteString
"/",
          uriQuery :: Query
URIBS.uriQuery =
            [(ByteString, ByteString)] -> Query
URIBS.Query
              [ (ByteString
"token", ByteString
apiTokenBS)
              ],
          uriFragment :: Maybe ByteString
URIBS.uriFragment = forall a. Maybe a
Nothing
        }

-------------------------------------------------------------------------------
newtype Bytes = Bytes
  { Bytes -> Int64
bytes :: Int64
  }
  deriving (Int -> Bytes -> ShowS
[Bytes] -> ShowS
Bytes -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Bytes] -> ShowS
$cshowList :: [Bytes] -> ShowS
show :: Bytes -> String
$cshow :: Bytes -> String
showsPrec :: Int -> Bytes -> ShowS
$cshowsPrec :: Int -> Bytes -> ShowS
Show, Bytes -> Bytes -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Bytes -> Bytes -> Bool
$c/= :: Bytes -> Bytes -> Bool
== :: Bytes -> Bytes -> Bool
$c== :: Bytes -> Bytes -> Bool
Eq, Integer -> Bytes
Bytes -> Bytes
Bytes -> Bytes -> Bytes
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Bytes
$cfromInteger :: Integer -> Bytes
signum :: Bytes -> Bytes
$csignum :: Bytes -> Bytes
abs :: Bytes -> Bytes
$cabs :: Bytes -> Bytes
negate :: Bytes -> Bytes
$cnegate :: Bytes -> Bytes
* :: Bytes -> Bytes -> Bytes
$c* :: Bytes -> Bytes -> Bytes
- :: Bytes -> Bytes -> Bytes
$c- :: Bytes -> Bytes -> Bytes
+ :: Bytes -> Bytes -> Bytes
$c+ :: Bytes -> Bytes -> Bytes
Num, Eq Bytes
Bytes -> Bytes -> Bool
Bytes -> Bytes -> Ordering
Bytes -> Bytes -> Bytes
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Bytes -> Bytes -> Bytes
$cmin :: Bytes -> Bytes -> Bytes
max :: Bytes -> Bytes -> Bytes
$cmax :: Bytes -> Bytes -> Bytes
>= :: Bytes -> Bytes -> Bool
$c>= :: Bytes -> Bytes -> Bool
> :: Bytes -> Bytes -> Bool
$c> :: Bytes -> Bytes -> Bool
<= :: Bytes -> Bytes -> Bool
$c<= :: Bytes -> Bytes -> Bool
< :: Bytes -> Bytes -> Bool
$c< :: Bytes -> Bytes -> Bool
compare :: Bytes -> Bytes -> Ordering
$ccompare :: Bytes -> Bytes -> Ordering
Ord, Bytes
forall a. a -> a -> Bounded a
maxBound :: Bytes
$cmaxBound :: Bytes
minBound :: Bytes
$cminBound :: Bytes
Bounded)

-- | How big of a body can we send? The limit is defined
-- <https://app.logz.io/#/dashboard/data-sources/Bulk-HTTPS here>.
maxPayloadBytes :: Bytes
maxPayloadBytes :: Bytes
maxPayloadBytes = Bytes
10485760

-- | How long can each serialized payload be (let's assume including
-- the trailing newline). The limit is defined
-- <https://app.logz.io/#/dashboard/data-sources/Bulk-HTTPS here>.
maxLogLineLength :: Bytes
maxLogLineLength :: Bytes
maxLogLineLength = Bytes
500000

measureJSONLine :: A.ToJSON a => a -> (BB.Builder, Bytes)
measureJSONLine :: forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine a
a = (ByteString -> Builder
BB.lazyByteString ByteString
lbs, Int64 -> Bytes
Bytes (ByteString -> Int64
LBS.length ByteString
lbs))
  where
    lbs :: ByteString
lbs = forall a. ToJSON a => a -> ByteString
A.encode a
a forall a. Semigroup a => a -> a -> a
<> ByteString
"\n"

-- | Fully-rendered JSON object for an item
fullItemObject :: K.LogItem a => K.Verbosity -> K.Item a -> A.Object
fullItemObject :: forall a. LogItem a => Verbosity -> Item a -> Object
fullItemObject Verbosity
verbosity Item a
item =
#if MIN_VERSION_aeson (2, 0, 0)
  forall v. [(Key, v)] -> KeyMap v
A.fromList
#else
  HM.fromList
#endif
    [ Key
"app" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Namespace
K._itemApp Item a
item,
      Key
"env" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Environment
K._itemEnv Item a
item,
      Key
"sev" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Severity
K._itemSeverity Item a
item,
      Key
"thread" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= ThreadIdText -> Text
K.getThreadIdText (forall a. Item a -> ThreadIdText
K._itemThread Item a
item),
      Key
"host" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> String
K._itemHost Item a
item,
      Key
"pid" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Int32
pidInt,
      Key
"data" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Object -> Object
annotateKeys (forall a. LogItem a => Verbosity -> a -> Object
K.payloadObject Verbosity
verbosity (forall a. Item a -> a
K._itemPayload Item a
item)),
      -- Slight deviation, logz.io uses "message" instead of "msg"
      Key
"message" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Builder -> Text
TB.toLazyText (LogStr -> Builder
K.unLogStr (forall a. Item a -> LogStr
K._itemMessage Item a
item)),
      -- Another slight deviation, logz.io uses "@timestamp" instead of
      -- "at". Note, your logs should be sent roughly close to when they
      -- are created. They are assigned to indexes based on index date, so
      -- time searches act strange if you backfill too far from the
      -- past. They seem to support 3 decimal places of
      -- precision. Formatting like this requires time 1.8.0.2, which is
      -- reflected in the cabal file.
      Key
"@timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Text -> Value
A.String (String -> Text
T.pack (forall t. FormatTime t => TimeLocale -> String -> t -> String
Time.formatTime TimeLocale
Time.defaultTimeLocale String
"%Y-%m-%dT%H:%M:%S%03QZ" (forall a. Item a -> UTCTime
K._itemTime Item a
item))),
      Key
"ns" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Namespace
K._itemNamespace Item a
item,
      Key
"loc" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= (Loc -> LocJs
LocJs forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Item a -> Maybe Loc
K._itemLoc Item a
item)
    ]
  where
    POSIX.CPid Int32
pidInt = forall a. Item a -> CPid
K._itemProcess Item a
item

-- | A version of 'renderLine' which renders a line and stays under
-- the maximum line size of 500,000 bytes. If the default rendering is
-- too large, the log will be reduced to a timestamp and a potentially
-- truncated message.
renderLineTruncated :: K.LogItem a => K.Verbosity -> K.Item a -> (BB.Builder, Bytes)
renderLineTruncated :: forall a. LogItem a => Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated = forall a.
LogItem a =>
Bytes -> Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated' Bytes
maxLogLineLength

-- | A generalized renderLineTruncated that takes a custom line length
-- limit. This is exclusively for testing.
renderLineTruncated' ::
  K.LogItem a =>
  -- | Custom max log line length. Be careful, too low and no amount
  -- of shrinking can get under the size, breaking the invariant. For
  -- the production limit, we are guraanteed always able to come in
  -- under the limit.
  Bytes ->
  K.Verbosity ->
  K.Item a ->
  (BB.Builder, Bytes)
renderLineTruncated' :: forall a.
LogItem a =>
Bytes -> Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated' Bytes
customMaxLogLineLength Verbosity
verbosity Item a
item =
  if Bytes
fullSize forall a. Ord a => a -> a -> Bool
<= Bytes
customMaxLogLineLength
    then (Builder
fullLine, Bytes
fullSize)
    else (Builder
fallbackLine, Bytes
fallbackSize)
  where
    fullObject :: Object
fullObject = forall a. LogItem a => Verbosity -> Item a -> Object
fullItemObject Verbosity
verbosity Item a
item
    (Builder
fullLine, Bytes
fullSize) = forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine Object
fullObject
    -- only the absolutely necessary keys, with message stripped out
    -- to help us calculate how much of the message we can keep. these
    -- are lazily evaluated and as such won't be computed unless the
    -- item is too big
    blankObject :: A.Object
    blankObject :: Object
blankObject =
#if MIN_VERSION_aeson (2, 0, 0)
      forall v. [(Key, v)] -> KeyMap v
A.fromList
#else
      HM.fromList
#endif
        [ Key
"message" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Text -> Value
A.String Text
"", -- we'll start with a blank message
          Key
"@timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> UTCTime
K._itemTime Item a
item
        ]
    (Builder
_, Bytes
blankObjectSize) = forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine Object
blankObject
    messageBytesAllowed :: Bytes
messageBytesAllowed = Bytes
maxLogLineLength forall a. Num a => a -> a -> a
- Bytes
blankObjectSize
    (Builder
fallbackLine, Bytes
fallbackSize) = forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine Object
fallbackObject
    fallbackObject :: A.Object
    fallbackObject :: Object
fallbackObject =
#if MIN_VERSION_aeson (2, 0, 0)
      forall v. [(Key, v)] -> KeyMap v
A.fromList
#else
      HM.fromList
#endif
        [ Key
"message" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. ToJSON a => a -> Value
A.toJSON (Int64 -> Text -> Text
TL.take (Bytes -> Int64
bytes Bytes
messageBytesAllowed) (Builder -> Text
TB.toLazyText (LogStr -> Builder
K.unLogStr (forall a. Item a -> LogStr
K._itemMessage Item a
item)))),
          Key
"@timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. ToJSON a => a -> Value
A.toJSON (forall a. Item a -> UTCTime
K._itemTime Item a
item)
        ]

data BulkBuffer = BulkBuffer
  { BulkBuffer -> Bytes
bulkBuffer_bytesUsed :: !Bytes,
    BulkBuffer -> Builder
bulkBuffer_payload :: !BB.Builder,
    BulkBuffer -> Int
bulkBuffer_itemCount :: !Int
  }

instance Semigroup.Semigroup BulkBuffer where
  (BulkBuffer Bytes
bytesUsedA Builder
bufferA Int
itemCountA)
    <> :: BulkBuffer -> BulkBuffer -> BulkBuffer
<> (BulkBuffer Bytes
bytesUsedB Builder
bufferB Int
itemCountB) =
      Bytes -> Builder -> Int -> BulkBuffer
BulkBuffer
        (Bytes
bytesUsedA forall a. Num a => a -> a -> a
+ Bytes
bytesUsedB)
        (Builder
bufferA forall a. Semigroup a => a -> a -> a
<> Builder
bufferB)
        (Int
itemCountA forall a. Num a => a -> a -> a
+ Int
itemCountB)

instance Monoid BulkBuffer where
  mempty :: BulkBuffer
mempty = Bytes -> Builder -> Int -> BulkBuffer
BulkBuffer Bytes
0 forall a. Monoid a => a
mempty Int
0
  mappend :: BulkBuffer -> BulkBuffer -> BulkBuffer
mappend = forall a. Semigroup a => a -> a -> a
(<>)

data LogAction buf
  = -- | New buffer
    Buffered
      buf
  | FlushNow
      buf
      -- ^ Buffer to flush
      buf
      -- ^ New buffer

bufferItem ::
  K.LogItem a =>
  -- | Maximum items before flushing
  Int ->
  K.Verbosity ->
  K.Item a ->
  BulkBuffer ->
  LogAction BulkBuffer
bufferItem :: forall a.
LogItem a =>
Int -> Verbosity -> Item a -> BulkBuffer -> LogAction BulkBuffer
bufferItem = forall a.
LogItem a =>
Bytes
-> Bytes
-> Int
-> Verbosity
-> Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem' Bytes
maxPayloadBytes Bytes
maxLogLineLength

-- | An internal version with a configurable max item size and max
-- message size for testing. Be careful with this: if you set the
-- values too low, some of the guarantees about reducability of
-- messages break down.
bufferItem' ::
  (K.LogItem a) =>
  -- | Max payload size
  Bytes ->
  -- | Max item size
  Bytes ->
  -- | Maximum items before flushing
  Int ->
  K.Verbosity ->
  K.Item a ->
  BulkBuffer ->
  LogAction BulkBuffer
bufferItem' :: forall a.
LogItem a =>
Bytes
-> Bytes
-> Int
-> Verbosity
-> Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem' Bytes
customMaxPayload Bytes
customMaxItem Int
maxItems Verbosity
verb Item a
item BulkBuffer
bulkBuffer =
  let (Builder
encodedLine, Bytes
spaceNeeded) = forall a.
LogItem a =>
Bytes -> Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated' Bytes
customMaxItem Verbosity
verb Item a
item
      newBytesUsed :: Bytes
newBytesUsed = BulkBuffer -> Bytes
bulkBuffer_bytesUsed BulkBuffer
bulkBuffer forall a. Num a => a -> a -> a
+ Bytes
spaceNeeded
      newItemCount :: Int
newItemCount = BulkBuffer -> Int
bulkBuffer_itemCount BulkBuffer
bulkBuffer forall a. Num a => a -> a -> a
+ Int
1
   in if Int
newItemCount forall a. Ord a => a -> a -> Bool
>= Int
maxItems Bool -> Bool -> Bool
|| Bytes
newBytesUsed forall a. Ord a => a -> a -> Bool
>= Bytes
customMaxPayload
        then
          forall buf. buf -> buf -> LogAction buf
FlushNow
            BulkBuffer
bulkBuffer
            BulkBuffer
              { bulkBuffer_bytesUsed :: Bytes
bulkBuffer_bytesUsed = Bytes
spaceNeeded,
                bulkBuffer_payload :: Builder
bulkBuffer_payload = Builder
encodedLine,
                bulkBuffer_itemCount :: Int
bulkBuffer_itemCount = Int
1
              }
        else
          forall buf. buf -> LogAction buf
Buffered forall a b. (a -> b) -> a -> b
$
            BulkBuffer
              { bulkBuffer_bytesUsed :: Bytes
bulkBuffer_bytesUsed = Bytes
newBytesUsed,
                bulkBuffer_payload :: Builder
bulkBuffer_payload = BulkBuffer -> Builder
bulkBuffer_payload BulkBuffer
bulkBuffer forall a. Semigroup a => a -> a -> a
<> Builder
encodedLine,
                bulkBuffer_itemCount :: Int
bulkBuffer_itemCount = Int
newItemCount
              }

-- | When time has run out on a flush, splits the buffer. n.b. this
-- will always return 'FlushNow' with an empty new buffer.
forceFlush :: (Monoid buf) => buf -> LogAction buf
forceFlush :: forall buf. Monoid buf => buf -> LogAction buf
forceFlush buf
buf = forall buf. buf -> buf -> LogAction buf
FlushNow buf
buf forall a. Monoid a => a
mempty

-------------------------------------------------------------------------------
-- Annotation borrowed from katip-elasticsearch. There are fixed
-- fields in katip logs which should stay the same type forever and
-- thus won't need annotation. However, any code in userland may
-- choose to add akey to their log's metadata with a type that's
-- incompatible with the mapping. If the first value that's picked up
-- restrictive (e.g. a long), subsequent values will be rejected. By
-- differentiating types by their name, this is guaranteed not to
-- happen.

annotateValue :: A.Value -> A.Value
annotateValue :: Value -> Value
annotateValue (A.Object Object
o) = Object -> Value
A.Object (Object -> Object
annotateKeys Object
o)
annotateValue (A.Array Array
a) = Array -> Value
A.Array (Value -> Value
annotateValue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Array
a)
annotateValue Value
x = Value
x

annotateKeys :: A.Object -> A.Object
#if MIN_VERSION_aeson (2, 0, 0)
annotateKeys :: Object -> Object
annotateKeys = forall v. [(Key, v)] -> KeyMap v
A.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map (Key, Value) -> (Key, Value)
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall v. KeyMap v -> [(Key, v)]
A.toList
  where
    go :: (Key, Value) -> (Key, Value)
go (Key
k, A.Object Object
o) = (Key
k, Object -> Value
A.Object (Object -> Object
annotateKeys Object
o))
    go (Key
k, A.Array Array
a) = (Key
k, Array -> Value
A.Array (Value -> Value
annotateValue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Array
a))
    go (Key
k, s :: Value
s@(A.String Text
_)) = (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
stringAnn), Value
s)
    go (Key
k, n :: Value
n@(A.Number Scientific
sci)) =
      if Scientific -> Bool
Scientific.isFloating Scientific
sci
        then (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
doubleAnn), Value
n)
        else (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
longAnn), Value
n)
    go (Key
k, b :: Value
b@(A.Bool Bool
_)) = (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
booleanAnn), Value
b)
    go (Key
k, Value
A.Null) = (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
nullAnn), Value
A.Null)
#else
annotateKeys = HM.fromList . map go . HM.toList
  where
    go (k, A.Object o) = (k, A.Object (annotateKeys o))
    go (k, A.Array a) = (k, A.Array (annotateValue <$> a))
    go (k, s@(A.String _)) = (k <> stringAnn, s)
    go (k, n@(A.Number sci)) =
      if Scientific.isFloating sci
        then (k <> doubleAnn, n)
        else (k <> longAnn, n)
    go (k, b@(A.Bool _)) = (k <> booleanAnn, b)
    go (k, A.Null) = (k <> nullAnn, A.Null)
#endif

-------------------------------------------------------------------------------
-- Annotation Constants
-------------------------------------------------------------------------------

stringAnn :: T.Text
stringAnn :: Text
stringAnn = Text
"::s"

doubleAnn :: T.Text
doubleAnn :: Text
doubleAnn = Text
"::d"

longAnn :: T.Text
longAnn :: Text
longAnn = Text
"::l"

booleanAnn :: T.Text
booleanAnn :: Text
booleanAnn = Text
"::b"

nullAnn :: T.Text
nullAnn :: Text
nullAnn = Text
"::n"