{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE CPP #-}
module Katip.Scribes.LogzIO.HTTPS
(
mkLogzIOScribe,
BulkAPIError (..),
LogzIOScribeConfiguration (..),
Scheme (..),
APIToken (..),
LoggingError (..),
usRegionHost,
euRegionHost,
httpsPort,
httpPort,
defaultRetryPolicy,
defaultLogzIOScribeConfiguration,
renderLineTruncated,
renderLineTruncated',
maxPayloadBytes,
maxLogLineLength,
BulkBuffer (..),
LogAction (..),
bufferItem,
bufferItem',
forceFlush,
Bytes (..),
)
where
import Control.Applicative
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TBMQueue as STM
import qualified Control.Error as E
import qualified Control.Exception.Safe as EX
import Control.Monad
import qualified Control.Retry as Retry
import qualified Data.Aeson as A
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
#if MIN_VERSION_aeson (2, 0, 0)
import qualified Data.Aeson.Key as A
import qualified Data.Aeson.KeyMap as A
#else
import qualified Data.HashMap.Strict as HM
#endif
import Data.Int
import qualified Data.Scientific as Scientific
import Data.Semigroup as Semigroup
import Data.String (IsString)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Builder as TB
import qualified Data.Time as Time
import qualified Katip as K
import Katip.Core (LocJs (..))
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTPS
import qualified Network.HTTP.Types as HTypes
import qualified System.Posix.Types as POSIX
import qualified URI.ByteString as URIBS
data BulkAPIError = BulkAPIError
{
BulkAPIError -> Int
bulkAPIError_malformedLines :: Int,
BulkAPIError -> Int
bulkAPIError_successfulLines :: Int,
BulkAPIError -> Int
bulkAPIError_oversizedLines :: Int,
BulkAPIError -> Int
bulkAPIError_emptyLogLines :: Int
}
deriving (Int -> BulkAPIError -> ShowS
[BulkAPIError] -> ShowS
BulkAPIError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BulkAPIError] -> ShowS
$cshowList :: [BulkAPIError] -> ShowS
show :: BulkAPIError -> String
$cshow :: BulkAPIError -> String
showsPrec :: Int -> BulkAPIError -> ShowS
$cshowsPrec :: Int -> BulkAPIError -> ShowS
Show, BulkAPIError -> BulkAPIError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BulkAPIError -> BulkAPIError -> Bool
$c/= :: BulkAPIError -> BulkAPIError -> Bool
== :: BulkAPIError -> BulkAPIError -> Bool
$c== :: BulkAPIError -> BulkAPIError -> Bool
Eq)
instance A.FromJSON BulkAPIError where
parseJSON :: Value -> Parser BulkAPIError
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
A.withObject String
"BulkAPIError" forall a b. (a -> b) -> a -> b
$ \Object
o -> do
Int
malformedLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"malformedLines"
Int
successfulLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"successfulLines"
Int
oversizedLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"oversizedLines"
Int
emptyLogLines <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
A..: Key
"emptyLogLines"
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
BulkAPIError
{ bulkAPIError_malformedLines :: Int
bulkAPIError_malformedLines = Int
malformedLines,
bulkAPIError_successfulLines :: Int
bulkAPIError_successfulLines = Int
successfulLines,
bulkAPIError_oversizedLines :: Int
bulkAPIError_oversizedLines = Int
oversizedLines,
bulkAPIError_emptyLogLines :: Int
bulkAPIError_emptyLogLines = Int
emptyLogLines
}
data LogzIOScribeConfiguration = LogzIOScribeConfiguration
{
LogzIOScribeConfiguration -> Int
logzIOScribeConfiguration_bufferItems :: Int,
LogzIOScribeConfiguration -> NominalDiffTime
logzIOScribeConfiguration_bufferTimeout :: Time.NominalDiffTime,
LogzIOScribeConfiguration -> Scheme
logzIOScribeConfiguration_scheme :: Scheme,
LogzIOScribeConfiguration -> Host
logzIOScribeConfiguration_host :: URIBS.Host,
LogzIOScribeConfiguration -> Port
logzIOScribeConfiguration_port :: URIBS.Port,
LogzIOScribeConfiguration -> APIToken
logzIOScribeConfiguration_token :: APIToken,
LogzIOScribeConfiguration -> RetryPolicyM IO
logzIOScribeConfiguration_retry :: Retry.RetryPolicyM IO,
LogzIOScribeConfiguration -> LoggingError -> IO ()
logzIOScribeConfiguration_onError :: LoggingError -> IO ()
}
defaultLogzIOScribeConfiguration :: APIToken -> LogzIOScribeConfiguration
defaultLogzIOScribeConfiguration :: APIToken -> LogzIOScribeConfiguration
defaultLogzIOScribeConfiguration APIToken
token =
LogzIOScribeConfiguration
{ logzIOScribeConfiguration_bufferItems :: Int
logzIOScribeConfiguration_bufferItems = Int
100,
logzIOScribeConfiguration_bufferTimeout :: NominalDiffTime
logzIOScribeConfiguration_bufferTimeout = NominalDiffTime
30,
logzIOScribeConfiguration_scheme :: Scheme
logzIOScribeConfiguration_scheme = Scheme
HTTPS,
logzIOScribeConfiguration_host :: Host
logzIOScribeConfiguration_host = Host
usRegionHost,
logzIOScribeConfiguration_port :: Port
logzIOScribeConfiguration_port = Port
httpsPort,
logzIOScribeConfiguration_token :: APIToken
logzIOScribeConfiguration_token = APIToken
token,
logzIOScribeConfiguration_retry :: RetryPolicyM IO
logzIOScribeConfiguration_retry = forall (m :: * -> *). Monad m => RetryPolicyM m
defaultRetryPolicy,
logzIOScribeConfiguration_onError :: LoggingError -> IO ()
logzIOScribeConfiguration_onError = forall a b. a -> b -> a
const (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
}
newtype APIToken = APIToken
{ APIToken -> Text
apiToken :: T.Text
}
deriving (Int -> APIToken -> ShowS
[APIToken] -> ShowS
APIToken -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [APIToken] -> ShowS
$cshowList :: [APIToken] -> ShowS
show :: APIToken -> String
$cshow :: APIToken -> String
showsPrec :: Int -> APIToken -> ShowS
$cshowsPrec :: Int -> APIToken -> ShowS
Show, APIToken -> APIToken -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: APIToken -> APIToken -> Bool
$c/= :: APIToken -> APIToken -> Bool
== :: APIToken -> APIToken -> Bool
$c== :: APIToken -> APIToken -> Bool
Eq, String -> APIToken
forall a. (String -> a) -> IsString a
fromString :: String -> APIToken
$cfromString :: String -> APIToken
IsString)
data Scheme
=
HTTPS
| HTTP
deriving (Int -> Scheme -> ShowS
[Scheme] -> ShowS
Scheme -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Scheme] -> ShowS
$cshowList :: [Scheme] -> ShowS
show :: Scheme -> String
$cshow :: Scheme -> String
showsPrec :: Int -> Scheme -> ShowS
$cshowsPrec :: Int -> Scheme -> ShowS
Show, Scheme -> Scheme -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Scheme -> Scheme -> Bool
$c/= :: Scheme -> Scheme -> Bool
== :: Scheme -> Scheme -> Bool
$c== :: Scheme -> Scheme -> Bool
Eq)
usRegionHost :: URIBS.Host
usRegionHost :: Host
usRegionHost = ByteString -> Host
URIBS.Host ByteString
"listener.logz.io"
euRegionHost :: URIBS.Host
euRegionHost :: Host
euRegionHost = ByteString -> Host
URIBS.Host ByteString
"listener-eu.logz.io"
httpsPort :: URIBS.Port
httpsPort :: Port
httpsPort = Int -> Port
URIBS.Port Int
8071
httpPort :: URIBS.Port
httpPort :: Port
httpPort = Int -> Port
URIBS.Port Int
8070
defaultRetryPolicy :: (Monad m) => Retry.RetryPolicyM m
defaultRetryPolicy :: forall (m :: * -> *). Monad m => RetryPolicyM m
defaultRetryPolicy = forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
Retry.exponentialBackoff Int
25000 forall a. Monoid a => a -> a -> a
`mappend` Int -> forall (m :: * -> *). Monad m => RetryPolicyM m
Retry.limitRetries Int
5
mkLogzIOScribe ::
LogzIOScribeConfiguration ->
K.PermitFunc ->
K.Verbosity ->
IO K.Scribe
mkLogzIOScribe :: LogzIOScribeConfiguration -> PermitFunc -> Verbosity -> IO Scribe
mkLogzIOScribe LogzIOScribeConfiguration
config PermitFunc
permitItem Verbosity
verbosity = do
TBMQueue AnyLogItem
ingestionQueue <- forall a. Int -> IO (TBMQueue a)
STM.newTBMQueueIO (Int
itemBufferSize forall a. Num a => a -> a -> a
* Int
10)
let newTimer :: IO (TVar Bool)
newTimer = Int -> IO (TVar Bool)
STM.registerDelay Int
itemBufferTimeoutMicros
TVar (TVar Bool)
timerRef <- forall a. a -> IO (TVar a)
STM.newTVarIO forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (TVar Bool)
newTimer
Manager
mgr <- case Scheme
scheme of
Scheme
HTTPS -> forall (m :: * -> *). MonadIO m => m Manager
HTTPS.newTlsManager
Scheme
HTTP -> ManagerSettings -> IO Manager
HTTP.newManager ManagerSettings
HTTP.defaultManagerSettings
let workExhausted :: STM.STM Bool
workExhausted :: STM Bool
workExhausted =
Bool -> Bool -> Bool
(&&)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TBMQueue a -> STM Bool
STM.isClosedTBMQueue TBMQueue AnyLogItem
ingestionQueue
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TBMQueue a -> STM Bool
STM.isEmptyTBMQueue TBMQueue AnyLogItem
ingestionQueue
let waitWorkExhausted :: STM.STM ()
waitWorkExhausted :: STM ()
waitWorkExhausted = Bool -> STM ()
STM.check forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM Bool
workExhausted
let pop :: STM.STM (Tick AnyLogItem)
pop :: STM (Tick AnyLogItem)
pop = forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. Tick a
WorkExhausted forall a. a -> Tick a
NewItem forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TBMQueue a -> STM (Maybe a)
STM.readTBMQueue TBMQueue AnyLogItem
ingestionQueue
let timeExpired :: STM.STM (Tick a)
timeExpired :: forall a. STM (Tick a)
timeExpired = do
Bool
isExpired <- forall a. TVar a -> STM a
STM.readTVar forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
STM.readTVar TVar (TVar Bool)
timerRef
Bool -> STM ()
STM.check Bool
isExpired
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Tick a
TimeExpired
let nextTick :: STM.STM (Tick AnyLogItem)
nextTick :: STM (Tick AnyLogItem)
nextTick =
forall a. STM (Tick a)
timeExpired
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (Tick AnyLogItem)
pop
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall a. Tick a
WorkExhausted forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ STM ()
waitWorkExhausted)
let push :: AnyLogItem -> STM.STM ()
push :: AnyLogItem -> STM ()
push = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> a -> STM ()
STM.writeTBMQueue TBMQueue AnyLogItem
ingestionQueue
let sealQueue :: IO ()
sealQueue = forall a. STM a -> IO a
STM.atomically (forall a. TBMQueue a -> STM ()
STM.closeTBMQueue TBMQueue AnyLogItem
ingestionQueue)
let resetTimer :: IO ()
resetTimer = forall a. STM a -> IO a
STM.atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TVar a -> a -> STM ()
STM.writeTVar TVar (TVar Bool)
timerRef forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (TVar Bool)
newTimer
let flush :: BulkBuffer -> IO ()
flush BulkBuffer
curBuffer = do
Either LoggingError ()
res <- LogzIOScribeConfiguration
-> Manager -> BulkBuffer -> IO (Either LoggingError ())
flushBuffer LogzIOScribeConfiguration
config Manager
mgr BulkBuffer
curBuffer
case Either LoggingError ()
res of
Left LoggingError
e -> LoggingError -> IO ()
onErrorSafe LoggingError
e
Right () -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
IO ()
resetTimer
let flushLoop :: BulkBuffer -> IO ()
flushLoop :: BulkBuffer -> IO ()
flushLoop BulkBuffer
curBuffer = do
Tick AnyLogItem
tick <- forall a. STM a -> IO a
STM.atomically STM (Tick AnyLogItem)
nextTick
case Tick AnyLogItem
tick of
Tick AnyLogItem
WorkExhausted -> do
BulkBuffer -> IO ()
flush BulkBuffer
curBuffer
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Tick AnyLogItem
TimeExpired -> do
BulkBuffer -> IO ()
flush BulkBuffer
curBuffer
BulkBuffer -> IO ()
flushLoop forall a. Monoid a => a
mempty
NewItem (AnyLogItem Item a
item) -> do
case forall a.
LogItem a =>
Int -> Verbosity -> Item a -> BulkBuffer -> LogAction BulkBuffer
bufferItem (LogzIOScribeConfiguration -> Int
logzIOScribeConfiguration_bufferItems LogzIOScribeConfiguration
config) Verbosity
verbosity Item a
item BulkBuffer
curBuffer of
Buffered BulkBuffer
newBuffer -> BulkBuffer -> IO ()
flushLoop BulkBuffer
newBuffer
FlushNow BulkBuffer
flushThis BulkBuffer
newBuffer -> do
BulkBuffer -> IO ()
flush BulkBuffer
flushThis
BulkBuffer -> IO ()
flushLoop BulkBuffer
newBuffer
Async ()
flushThread <- forall a. IO a -> IO (Async a)
Async.async (BulkBuffer -> IO ()
flushLoop forall a. Monoid a => a
mempty)
let close :: IO ()
close = do
IO ()
sealQueue
Either SomeException ()
_ <- forall a. Async a -> IO (Either SomeException a)
Async.waitCatch Async ()
flushThread
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
K.Scribe
{ liPush :: forall a. LogItem a => Item a -> IO ()
K.liPush = forall a. STM a -> IO a
STM.atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnyLogItem -> STM ()
push forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. LogItem a => Item a -> AnyLogItem
AnyLogItem,
scribeFinalizer :: IO ()
K.scribeFinalizer = IO ()
close,
scribePermitItem :: PermitFunc
K.scribePermitItem = PermitFunc
permitItem
}
where
itemBufferSize :: Int
itemBufferSize = LogzIOScribeConfiguration -> Int
logzIOScribeConfiguration_bufferItems LogzIOScribeConfiguration
config
itemBufferTimeoutMicros :: Int
itemBufferTimeoutMicros = NominalDiffTime -> Int
ndtToMicros (LogzIOScribeConfiguration -> NominalDiffTime
logzIOScribeConfiguration_bufferTimeout LogzIOScribeConfiguration
config)
scheme :: Scheme
scheme = LogzIOScribeConfiguration -> Scheme
logzIOScribeConfiguration_scheme LogzIOScribeConfiguration
config
onErrorSafe :: LoggingError -> IO ()
onErrorSafe LoggingError
ex = do
Either SomeException ()
_ <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
EX.tryAny (LogzIOScribeConfiguration -> LoggingError -> IO ()
logzIOScribeConfiguration_onError LogzIOScribeConfiguration
config LoggingError
ex)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
data AnyLogItem where
AnyLogItem :: K.LogItem a => K.Item a -> AnyLogItem
data Tick a
= TimeExpired
| NewItem !a
| WorkExhausted
ndtPicos :: Time.NominalDiffTime -> Int64
ndtPicos :: NominalDiffTime -> Int64
ndtPicos = forall a b. (RealFrac a, Integral b) => a -> b
round forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Num a => a -> a -> a
* NominalDiffTime
picos)
where
picos :: Time.NominalDiffTime
picos :: NominalDiffTime
picos = NominalDiffTime
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
9 :: Int)
ndtToMicros :: Time.NominalDiffTime -> Int
ndtToMicros :: NominalDiffTime -> Int
ndtToMicros NominalDiffTime
t = forall a b. (RealFrac a, Integral b) => a -> b
round ((forall a b. (Integral a, Num b) => a -> b
fromIntegral (NominalDiffTime -> Int64
ndtPicos NominalDiffTime
t) :: Double) forall a. Fractional a => a -> a -> a
/ Double
picosInMicro)
where
picosInMicro :: Double
picosInMicro = Double
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
3 :: Int)
data LoggingError
=
URIError HTTP.HttpException
|
RequestError HTTP.HttpException
|
PartialFailure BulkAPIError
|
BadToken
|
UnknownFailureResponse HTypes.Status LBS.ByteString
deriving (Int -> LoggingError -> ShowS
[LoggingError] -> ShowS
LoggingError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LoggingError] -> ShowS
$cshowList :: [LoggingError] -> ShowS
show :: LoggingError -> String
$cshow :: LoggingError -> String
showsPrec :: Int -> LoggingError -> ShowS
$cshowsPrec :: Int -> LoggingError -> ShowS
Show)
flushBuffer ::
LogzIOScribeConfiguration ->
HTTP.Manager ->
BulkBuffer ->
IO (Either LoggingError ())
flushBuffer :: LogzIOScribeConfiguration
-> Manager -> BulkBuffer -> IO (Either LoggingError ())
flushBuffer LogzIOScribeConfiguration
config Manager
mgr BulkBuffer
bulkBuffer
| BulkBuffer -> Int
bulkBuffer_itemCount BulkBuffer
bulkBuffer forall a. Ord a => a -> a -> Bool
<= Int
0 = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
| Bool
otherwise = do
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
E.runExceptT forall a b. (a -> b) -> a -> b
$ do
Request
req <- forall (m :: * -> *) a b r.
Functor m =>
(a -> b) -> ExceptT a m r -> ExceptT b m r
E.fmapLT HttpException -> LoggingError
URIError (forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
E.ExceptT (forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
EX.try (Request -> Request
configureRequest forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *). MonadThrow m => String -> m Request
HTTP.parseRequest String
uriStr)))
Response ByteString
resp <- forall (m :: * -> *) a b r.
Functor m =>
(a -> b) -> ExceptT a m r -> ExceptT b m r
E.fmapLT HttpException -> LoggingError
RequestError forall a b. (a -> b) -> a -> b
$
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
E.ExceptT forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
EX.try forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
Retry.recovering RetryPolicyM IO
retryPolicy [\RetryStatus
_stat -> forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
EX.Handler forall (m :: * -> *). Applicative m => HttpException -> m Bool
handleHttpException] forall a b. (a -> b) -> a -> b
$ \RetryStatus
_stat ->
Request -> Manager -> IO (Response ByteString)
HTTP.httpLbs Request
req Manager
mgr
let respLBS :: ByteString
respLBS = forall body. Response body -> body
HTTP.responseBody Response ByteString
resp
let respStatus :: Status
respStatus = forall body. Response body -> Status
HTTP.responseStatus Response ByteString
resp
if Status -> Bool
HTypes.statusIsSuccessful Status
respStatus
then forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else case forall a. FromJSON a => ByteString -> Maybe a
A.decode @BulkAPIError ByteString
respLBS of
Maybe BulkAPIError
Nothing
| Status -> Int
HTypes.statusCode Status
respStatus forall a. Eq a => a -> a -> Bool
== Int
401 -> forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
E.throwE LoggingError
BadToken
| Bool
otherwise -> forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
E.throwE (Status -> ByteString -> LoggingError
UnknownFailureResponse Status
respStatus ByteString
respLBS)
Just BulkAPIError
bulkError -> forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
E.throwE (BulkAPIError -> LoggingError
PartialFailure BulkAPIError
bulkError)
where
configureRequest :: Request -> Request
configureRequest Request
req =
Request
req
{ method :: ByteString
HTTP.method = ByteString
HTypes.methodPost,
requestBody :: RequestBody
HTTP.requestBody = ByteString -> RequestBody
HTTP.RequestBodyLBS (Builder -> ByteString
BB.toLazyByteString (BulkBuffer -> Builder
bulkBuffer_payload BulkBuffer
bulkBuffer))
}
retryPolicy :: RetryPolicyM IO
retryPolicy = LogzIOScribeConfiguration -> RetryPolicyM IO
logzIOScribeConfiguration_retry LogzIOScribeConfiguration
config
apiTokenBS :: ByteString
apiTokenBS = Text -> ByteString
TE.encodeUtf8 (APIToken -> Text
apiToken (LogzIOScribeConfiguration -> APIToken
logzIOScribeConfiguration_token LogzIOScribeConfiguration
config))
handleHttpException :: (Applicative m) => HTTP.HttpException -> m Bool
handleHttpException :: forall (m :: * -> *). Applicative m => HttpException -> m Bool
handleHttpException HttpException
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
uriStr :: String
uriStr = ByteString -> String
LBS8.unpack (Builder -> ByteString
BB.toLazyByteString (forall a. URIRef a -> Builder
URIBS.serializeURIRef URIRef Absolute
uri))
authority :: Authority
authority =
URIBS.Authority
{ authorityUserInfo :: Maybe UserInfo
URIBS.authorityUserInfo = forall a. Maybe a
Nothing,
authorityHost :: Host
URIBS.authorityHost = LogzIOScribeConfiguration -> Host
logzIOScribeConfiguration_host LogzIOScribeConfiguration
config,
authorityPort :: Maybe Port
URIBS.authorityPort = forall a. a -> Maybe a
Just (LogzIOScribeConfiguration -> Port
logzIOScribeConfiguration_port LogzIOScribeConfiguration
config)
}
uri :: URIRef Absolute
uri =
URIBS.URI
{ uriScheme :: Scheme
URIBS.uriScheme = case LogzIOScribeConfiguration -> Scheme
logzIOScribeConfiguration_scheme LogzIOScribeConfiguration
config of
Scheme
HTTPS -> ByteString -> Scheme
URIBS.Scheme ByteString
"https"
Scheme
HTTP -> ByteString -> Scheme
URIBS.Scheme ByteString
"http",
uriAuthority :: Maybe Authority
URIBS.uriAuthority = forall a. a -> Maybe a
Just Authority
authority,
uriPath :: ByteString
URIBS.uriPath = ByteString
"/",
uriQuery :: Query
URIBS.uriQuery =
[(ByteString, ByteString)] -> Query
URIBS.Query
[ (ByteString
"token", ByteString
apiTokenBS)
],
uriFragment :: Maybe ByteString
URIBS.uriFragment = forall a. Maybe a
Nothing
}
newtype Bytes = Bytes
{ Bytes -> Int64
bytes :: Int64
}
deriving (Int -> Bytes -> ShowS
[Bytes] -> ShowS
Bytes -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Bytes] -> ShowS
$cshowList :: [Bytes] -> ShowS
show :: Bytes -> String
$cshow :: Bytes -> String
showsPrec :: Int -> Bytes -> ShowS
$cshowsPrec :: Int -> Bytes -> ShowS
Show, Bytes -> Bytes -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Bytes -> Bytes -> Bool
$c/= :: Bytes -> Bytes -> Bool
== :: Bytes -> Bytes -> Bool
$c== :: Bytes -> Bytes -> Bool
Eq, Integer -> Bytes
Bytes -> Bytes
Bytes -> Bytes -> Bytes
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Bytes
$cfromInteger :: Integer -> Bytes
signum :: Bytes -> Bytes
$csignum :: Bytes -> Bytes
abs :: Bytes -> Bytes
$cabs :: Bytes -> Bytes
negate :: Bytes -> Bytes
$cnegate :: Bytes -> Bytes
* :: Bytes -> Bytes -> Bytes
$c* :: Bytes -> Bytes -> Bytes
- :: Bytes -> Bytes -> Bytes
$c- :: Bytes -> Bytes -> Bytes
+ :: Bytes -> Bytes -> Bytes
$c+ :: Bytes -> Bytes -> Bytes
Num, Eq Bytes
Bytes -> Bytes -> Bool
Bytes -> Bytes -> Ordering
Bytes -> Bytes -> Bytes
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Bytes -> Bytes -> Bytes
$cmin :: Bytes -> Bytes -> Bytes
max :: Bytes -> Bytes -> Bytes
$cmax :: Bytes -> Bytes -> Bytes
>= :: Bytes -> Bytes -> Bool
$c>= :: Bytes -> Bytes -> Bool
> :: Bytes -> Bytes -> Bool
$c> :: Bytes -> Bytes -> Bool
<= :: Bytes -> Bytes -> Bool
$c<= :: Bytes -> Bytes -> Bool
< :: Bytes -> Bytes -> Bool
$c< :: Bytes -> Bytes -> Bool
compare :: Bytes -> Bytes -> Ordering
$ccompare :: Bytes -> Bytes -> Ordering
Ord, Bytes
forall a. a -> a -> Bounded a
maxBound :: Bytes
$cmaxBound :: Bytes
minBound :: Bytes
$cminBound :: Bytes
Bounded)
maxPayloadBytes :: Bytes
maxPayloadBytes :: Bytes
maxPayloadBytes = Bytes
10485760
maxLogLineLength :: Bytes
maxLogLineLength :: Bytes
maxLogLineLength = Bytes
500000
measureJSONLine :: A.ToJSON a => a -> (BB.Builder, Bytes)
measureJSONLine :: forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine a
a = (ByteString -> Builder
BB.lazyByteString ByteString
lbs, Int64 -> Bytes
Bytes (ByteString -> Int64
LBS.length ByteString
lbs))
where
lbs :: ByteString
lbs = forall a. ToJSON a => a -> ByteString
A.encode a
a forall a. Semigroup a => a -> a -> a
<> ByteString
"\n"
fullItemObject :: K.LogItem a => K.Verbosity -> K.Item a -> A.Object
fullItemObject :: forall a. LogItem a => Verbosity -> Item a -> Object
fullItemObject Verbosity
verbosity Item a
item =
#if MIN_VERSION_aeson (2, 0, 0)
forall v. [(Key, v)] -> KeyMap v
A.fromList
#else
HM.fromList
#endif
[ Key
"app" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Namespace
K._itemApp Item a
item,
Key
"env" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Environment
K._itemEnv Item a
item,
Key
"sev" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Severity
K._itemSeverity Item a
item,
Key
"thread" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= ThreadIdText -> Text
K.getThreadIdText (forall a. Item a -> ThreadIdText
K._itemThread Item a
item),
Key
"host" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> String
K._itemHost Item a
item,
Key
"pid" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Int32
pidInt,
Key
"data" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Object -> Object
annotateKeys (forall a. LogItem a => Verbosity -> a -> Object
K.payloadObject Verbosity
verbosity (forall a. Item a -> a
K._itemPayload Item a
item)),
Key
"message" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Builder -> Text
TB.toLazyText (LogStr -> Builder
K.unLogStr (forall a. Item a -> LogStr
K._itemMessage Item a
item)),
Key
"@timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Text -> Value
A.String (String -> Text
T.pack (forall t. FormatTime t => TimeLocale -> String -> t -> String
Time.formatTime TimeLocale
Time.defaultTimeLocale String
"%Y-%m-%dT%H:%M:%S%03QZ" (forall a. Item a -> UTCTime
K._itemTime Item a
item))),
Key
"ns" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> Namespace
K._itemNamespace Item a
item,
Key
"loc" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= (Loc -> LocJs
LocJs forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Item a -> Maybe Loc
K._itemLoc Item a
item)
]
where
POSIX.CPid Int32
pidInt = forall a. Item a -> CPid
K._itemProcess Item a
item
renderLineTruncated :: K.LogItem a => K.Verbosity -> K.Item a -> (BB.Builder, Bytes)
renderLineTruncated :: forall a. LogItem a => Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated = forall a.
LogItem a =>
Bytes -> Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated' Bytes
maxLogLineLength
renderLineTruncated' ::
K.LogItem a =>
Bytes ->
K.Verbosity ->
K.Item a ->
(BB.Builder, Bytes)
renderLineTruncated' :: forall a.
LogItem a =>
Bytes -> Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated' Bytes
customMaxLogLineLength Verbosity
verbosity Item a
item =
if Bytes
fullSize forall a. Ord a => a -> a -> Bool
<= Bytes
customMaxLogLineLength
then (Builder
fullLine, Bytes
fullSize)
else (Builder
fallbackLine, Bytes
fallbackSize)
where
fullObject :: Object
fullObject = forall a. LogItem a => Verbosity -> Item a -> Object
fullItemObject Verbosity
verbosity Item a
item
(Builder
fullLine, Bytes
fullSize) = forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine Object
fullObject
blankObject :: A.Object
blankObject :: Object
blankObject =
#if MIN_VERSION_aeson (2, 0, 0)
forall v. [(Key, v)] -> KeyMap v
A.fromList
#else
HM.fromList
#endif
[ Key
"message" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= Text -> Value
A.String Text
"",
Key
"@timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. Item a -> UTCTime
K._itemTime Item a
item
]
(Builder
_, Bytes
blankObjectSize) = forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine Object
blankObject
messageBytesAllowed :: Bytes
messageBytesAllowed = Bytes
maxLogLineLength forall a. Num a => a -> a -> a
- Bytes
blankObjectSize
(Builder
fallbackLine, Bytes
fallbackSize) = forall a. ToJSON a => a -> (Builder, Bytes)
measureJSONLine Object
fallbackObject
fallbackObject :: A.Object
fallbackObject :: Object
fallbackObject =
#if MIN_VERSION_aeson (2, 0, 0)
forall v. [(Key, v)] -> KeyMap v
A.fromList
#else
HM.fromList
#endif
[ Key
"message" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. ToJSON a => a -> Value
A.toJSON (Int64 -> Text -> Text
TL.take (Bytes -> Int64
bytes Bytes
messageBytesAllowed) (Builder -> Text
TB.toLazyText (LogStr -> Builder
K.unLogStr (forall a. Item a -> LogStr
K._itemMessage Item a
item)))),
Key
"@timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
A..= forall a. ToJSON a => a -> Value
A.toJSON (forall a. Item a -> UTCTime
K._itemTime Item a
item)
]
data BulkBuffer = BulkBuffer
{ BulkBuffer -> Bytes
bulkBuffer_bytesUsed :: !Bytes,
BulkBuffer -> Builder
bulkBuffer_payload :: !BB.Builder,
BulkBuffer -> Int
bulkBuffer_itemCount :: !Int
}
instance Semigroup.Semigroup BulkBuffer where
(BulkBuffer Bytes
bytesUsedA Builder
bufferA Int
itemCountA)
<> :: BulkBuffer -> BulkBuffer -> BulkBuffer
<> (BulkBuffer Bytes
bytesUsedB Builder
bufferB Int
itemCountB) =
Bytes -> Builder -> Int -> BulkBuffer
BulkBuffer
(Bytes
bytesUsedA forall a. Num a => a -> a -> a
+ Bytes
bytesUsedB)
(Builder
bufferA forall a. Semigroup a => a -> a -> a
<> Builder
bufferB)
(Int
itemCountA forall a. Num a => a -> a -> a
+ Int
itemCountB)
instance Monoid BulkBuffer where
mempty :: BulkBuffer
mempty = Bytes -> Builder -> Int -> BulkBuffer
BulkBuffer Bytes
0 forall a. Monoid a => a
mempty Int
0
mappend :: BulkBuffer -> BulkBuffer -> BulkBuffer
mappend = forall a. Semigroup a => a -> a -> a
(<>)
data LogAction buf
=
Buffered
buf
| FlushNow
buf
buf
bufferItem ::
K.LogItem a =>
Int ->
K.Verbosity ->
K.Item a ->
BulkBuffer ->
LogAction BulkBuffer
bufferItem :: forall a.
LogItem a =>
Int -> Verbosity -> Item a -> BulkBuffer -> LogAction BulkBuffer
bufferItem = forall a.
LogItem a =>
Bytes
-> Bytes
-> Int
-> Verbosity
-> Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem' Bytes
maxPayloadBytes Bytes
maxLogLineLength
bufferItem' ::
(K.LogItem a) =>
Bytes ->
Bytes ->
Int ->
K.Verbosity ->
K.Item a ->
BulkBuffer ->
LogAction BulkBuffer
bufferItem' :: forall a.
LogItem a =>
Bytes
-> Bytes
-> Int
-> Verbosity
-> Item a
-> BulkBuffer
-> LogAction BulkBuffer
bufferItem' Bytes
customMaxPayload Bytes
customMaxItem Int
maxItems Verbosity
verb Item a
item BulkBuffer
bulkBuffer =
let (Builder
encodedLine, Bytes
spaceNeeded) = forall a.
LogItem a =>
Bytes -> Verbosity -> Item a -> (Builder, Bytes)
renderLineTruncated' Bytes
customMaxItem Verbosity
verb Item a
item
newBytesUsed :: Bytes
newBytesUsed = BulkBuffer -> Bytes
bulkBuffer_bytesUsed BulkBuffer
bulkBuffer forall a. Num a => a -> a -> a
+ Bytes
spaceNeeded
newItemCount :: Int
newItemCount = BulkBuffer -> Int
bulkBuffer_itemCount BulkBuffer
bulkBuffer forall a. Num a => a -> a -> a
+ Int
1
in if Int
newItemCount forall a. Ord a => a -> a -> Bool
>= Int
maxItems Bool -> Bool -> Bool
|| Bytes
newBytesUsed forall a. Ord a => a -> a -> Bool
>= Bytes
customMaxPayload
then
forall buf. buf -> buf -> LogAction buf
FlushNow
BulkBuffer
bulkBuffer
BulkBuffer
{ bulkBuffer_bytesUsed :: Bytes
bulkBuffer_bytesUsed = Bytes
spaceNeeded,
bulkBuffer_payload :: Builder
bulkBuffer_payload = Builder
encodedLine,
bulkBuffer_itemCount :: Int
bulkBuffer_itemCount = Int
1
}
else
forall buf. buf -> LogAction buf
Buffered forall a b. (a -> b) -> a -> b
$
BulkBuffer
{ bulkBuffer_bytesUsed :: Bytes
bulkBuffer_bytesUsed = Bytes
newBytesUsed,
bulkBuffer_payload :: Builder
bulkBuffer_payload = BulkBuffer -> Builder
bulkBuffer_payload BulkBuffer
bulkBuffer forall a. Semigroup a => a -> a -> a
<> Builder
encodedLine,
bulkBuffer_itemCount :: Int
bulkBuffer_itemCount = Int
newItemCount
}
forceFlush :: (Monoid buf) => buf -> LogAction buf
forceFlush :: forall buf. Monoid buf => buf -> LogAction buf
forceFlush buf
buf = forall buf. buf -> buf -> LogAction buf
FlushNow buf
buf forall a. Monoid a => a
mempty
annotateValue :: A.Value -> A.Value
annotateValue :: Value -> Value
annotateValue (A.Object Object
o) = Object -> Value
A.Object (Object -> Object
annotateKeys Object
o)
annotateValue (A.Array Array
a) = Array -> Value
A.Array (Value -> Value
annotateValue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Array
a)
annotateValue Value
x = Value
x
annotateKeys :: A.Object -> A.Object
#if MIN_VERSION_aeson (2, 0, 0)
annotateKeys :: Object -> Object
annotateKeys = forall v. [(Key, v)] -> KeyMap v
A.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map (Key, Value) -> (Key, Value)
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall v. KeyMap v -> [(Key, v)]
A.toList
where
go :: (Key, Value) -> (Key, Value)
go (Key
k, A.Object Object
o) = (Key
k, Object -> Value
A.Object (Object -> Object
annotateKeys Object
o))
go (Key
k, A.Array Array
a) = (Key
k, Array -> Value
A.Array (Value -> Value
annotateValue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Array
a))
go (Key
k, s :: Value
s@(A.String Text
_)) = (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
stringAnn), Value
s)
go (Key
k, n :: Value
n@(A.Number Scientific
sci)) =
if Scientific -> Bool
Scientific.isFloating Scientific
sci
then (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
doubleAnn), Value
n)
else (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
longAnn), Value
n)
go (Key
k, b :: Value
b@(A.Bool Bool
_)) = (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
booleanAnn), Value
b)
go (Key
k, Value
A.Null) = (Text -> Key
A.fromText (Key -> Text
A.toText Key
k forall a. Semigroup a => a -> a -> a
<> Text
nullAnn), Value
A.Null)
#else
annotateKeys = HM.fromList . map go . HM.toList
where
go (k, A.Object o) = (k, A.Object (annotateKeys o))
go (k, A.Array a) = (k, A.Array (annotateValue <$> a))
go (k, s@(A.String _)) = (k <> stringAnn, s)
go (k, n@(A.Number sci)) =
if Scientific.isFloating sci
then (k <> doubleAnn, n)
else (k <> longAnn, n)
go (k, b@(A.Bool _)) = (k <> booleanAnn, b)
go (k, A.Null) = (k <> nullAnn, A.Null)
#endif
stringAnn :: T.Text
stringAnn :: Text
stringAnn = Text
"::s"
doubleAnn :: T.Text
doubleAnn :: Text
doubleAnn = Text
"::d"
longAnn :: T.Text
longAnn :: Text
longAnn = Text
"::l"
booleanAnn :: T.Text
booleanAnn :: Text
booleanAnn = Text
"::b"
nullAnn :: T.Text
nullAnn :: Text
nullAnn = Text
"::n"