module Log.Backend.ElasticSearch.V1 (
ElasticSearchConfig
, esServer
, esIndex
, esMapping
, esLogin
, esLoginInsecure
, defaultElasticSearchConfig
, withElasticSearchLogger
, elasticSearchLogger
) where
import Control.Applicative
import Control.Arrow (second)
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Aeson
import Data.Aeson.Encode.Pretty
import Data.Bits
import Data.IORef
import Data.Maybe (isJust)
import Data.Semigroup
import Data.Time
import Data.Time.Clock.POSIX
import Data.Word
import Database.V1.Bloodhound hiding (Status)
import Log
import Log.Internal.Logger
import Network.HTTP.Client
import Network.HTTP.Client.TLS (tlsManagerSettings)
import Prelude
import System.IO
import TextShow
import qualified Data.Aeson.Encoding as Aeson
import qualified Data.ByteString as BS
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Lazy.Char8 as BSL
import qualified Data.HashMap.Strict as H
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Traversable as F
import qualified Data.Vector as V
import Log.Backend.ElasticSearch.V1.Internal
withElasticSearchLogger :: ElasticSearchConfig -> IO Word32 -> (Logger -> IO r)
-> IO r
withElasticSearchLogger conf randGen act = do
logger <- elasticSearchLogger conf randGen
withLogger logger act
elasticSearchLogger ::
ElasticSearchConfig
-> IO Word32
-> IO Logger
elasticSearchLogger ElasticSearchConfig{..} genRandomWord = do
checkElasticSearchLogin
checkElasticSearchConnection
indexRef <- newIORef $ IndexName T.empty
mkBulkLogger "ElasticSearch" (\msgs -> do
now <- getCurrentTime
oldIndex <- readIORef indexRef
baseID <- (<>)
<$> (littleEndianRep <$> liftIO genRandomWord)
<*> pure (littleEndianRep . floor $ timeToDouble now)
retryOnException . runBH_ $ do
let index = IndexName $ T.concat [
esIndex
, "-"
, T.pack $ formatTime defaultTimeLocale "%F" now
]
when (oldIndex /= index) $ do
indexExists' <- indexExists index
unless indexExists' $ do
let indexSettings = IndexSettings {
indexShards = ShardCount 4
, indexReplicas = ReplicaCount 1
}
void $ createIndex indexSettings index
reply <- putMapping index mapping LogsMapping
when (not $ isSuccess reply) $ do
error $ "ElasticSearch: error while creating mapping: "
<> T.unpack (T.decodeUtf8 . BSL.toStrict . jsonToBSL
$ decodeReply reply)
liftIO $ writeIORef indexRef index
let jsonMsgs = V.fromList $ map (toJsonMsg now) $ zip [1..] msgs
reply <- bulk $ V.map (toBulk index baseID) jsonMsgs
let replyBody = decodeReply reply
result = do
Object response <- return replyBody
Bool hasErrors <- "errors" `H.lookup` response
Array jsonItems <- "items" `H.lookup` response
items <- F.forM jsonItems $ \v -> do
Object item <- return v
Object index_ <- "index" `H.lookup` item
return index_
guard $ V.length items == V.length jsonMsgs
return (hasErrors, items)
case result of
Nothing -> liftIO . BSL.putStrLn
$ "ElasticSearch: unexpected response: " <> jsonToBSL replyBody
Just (hasErrors, items) -> when hasErrors $ do
let failed = V.findIndices (H.member "error") items
dummyMsgs <- V.forM failed $ \n -> do
dataNamespace <- liftIO genRandomWord
let modifyData oldData = object [
"__es_error" .= H.lookup "error" (items V.! n)
, "__es_modified" .= True
, ("__data_" <> showt dataNamespace) .= oldData
]
return . second (H.adjust modifyData "data") $ jsonMsgs V.! n
void $ bulk (V.map (toBulk index baseID) dummyMsgs))
(elasticSearchSync indexRef)
where
server = Server esServer
mapping = MappingName esMapping
elasticSearchSync :: IORef IndexName -> IO ()
elasticSearchSync indexRef = do
indexName <- readIORef indexRef
void . runBH_ $ refreshIndex indexName
checkElasticSearchLogin :: IO ()
checkElasticSearchLogin =
when (isJust esLogin
&& not esLoginInsecure
&& not ("https:" `T.isPrefixOf` esServer)) $
error $ "ElasticSearch: insecure login: "
<> "Attempting to send login credentials over an insecure connection. "
<> "Set esLoginInsecure = True to disable this check."
checkElasticSearchConnection :: IO ()
checkElasticSearchConnection = try (void $ runBH_ listIndices) >>= \case
Left (ex::HttpException) ->
hPutStrLn stderr $ "ElasticSearch: unexpected error: " <> show ex
<> " (is ElasticSearch server running?)"
Right () -> return ()
retryOnException :: forall r. IO r -> IO r
retryOnException m = try m >>= \case
Left (ex::SomeException) -> do
putStrLn $ "ElasticSearch: unexpected error: "
<> show ex <> ", retrying in 10 seconds"
threadDelay $ 10 * 1000000
retryOnException m
Right result -> return result
timeToDouble :: UTCTime -> Double
timeToDouble = realToFrac . utcTimeToPOSIXSeconds
runBH_ :: forall r. BH IO r -> IO r
runBH_ f = do
mgr <- newManager tlsManagerSettings
let hook = maybe return (uncurry basicAuthHook) esLogin
let env = (mkBHEnv server mgr) { bhRequestHook = hook }
runBH env f
jsonToBSL :: Value -> BSL.ByteString
jsonToBSL = encodePretty' defConfig { confIndent = Spaces 2 }
toJsonMsg :: UTCTime -> (Word32, LogMessage)
-> (Word32, H.HashMap T.Text Value)
toJsonMsg now (n, msg) = (n, H.union jMsg $ H.fromList [
("insertion_order", toJSON n)
, ("insertion_time", toJSON now)
])
where
Object jMsg = toJSON msg
mkDocId :: BS.ByteString -> Word32 -> DocId
mkDocId baseID insertionOrder = DocId . T.decodeUtf8
. B64.encode $ BS.concat [
baseID
, littleEndianRep insertionOrder
]
toBulk :: IndexName -> BS.ByteString -> (Word32, H.HashMap T.Text Value)
-> BulkOperation
toBulk index baseID (n, obj) =
BulkIndex index mapping (mkDocId baseID n) $ Object obj
data LogsMapping = LogsMapping
instance ToJSON LogsMapping where
toJSON LogsMapping = object [
"properties" .= object [
"insertion_order" .= object [
"type" .= ("integer"::T.Text)
]
, "insertion_time" .= object [
"type" .= ("date"::T.Text)
, "format" .= ("date_time"::T.Text)
]
, "time" .= object [
"type" .= ("date"::T.Text)
, "format" .= ("date_time"::T.Text)
]
, "domain" .= object [
"type" .= ("string"::T.Text)
]
, "level" .= object [
"type" .= ("string"::T.Text)
]
, "component" .= object [
"type" .= ("string"::T.Text)
]
, "message" .= object [
"type" .= ("string"::T.Text)
]
]
]
toEncoding LogsMapping = Aeson.pairs $ mconcat
[ Aeson.pair "properties" $ Aeson.pairs $ mconcat
[ Aeson.pair "insertion_order" $ Aeson.pairs $ mconcat
[ "type" .= ("integer"::T.Text)
]
, Aeson.pair "insertion_time" $ Aeson.pairs $ mconcat
[ "type" .= ("date"::T.Text)
, "format" .= ("date_time"::T.Text)
]
, Aeson.pair "time" $ Aeson.pairs $ mconcat
[ "type" .= ("date"::T.Text)
, "format" .= ("date_time"::T.Text)
]
, Aeson.pair "domain" $ Aeson.pairs $ mconcat
[ "type" .= ("string"::T.Text)
]
, Aeson.pair "level" $ Aeson.pairs $ mconcat
[ "type" .= ("string"::T.Text)
]
, Aeson.pair "component" $ Aeson.pairs $ mconcat
[ "type" .= ("string"::T.Text)
]
, Aeson.pair "message" $ Aeson.pairs $ mconcat
[ "type" .= ("string"::T.Text)
]
]
]
littleEndianRep :: Word32 -> BS.ByteString
littleEndianRep = fst . BS.unfoldrN 4 step
where
step n = Just (fromIntegral $ n .&. 0xff, n `shiftR` 8)
decodeReply :: Reply -> Value
decodeReply reply = case eitherDecode' $ responseBody reply of
Right body -> body
Left err -> object ["decoding_error" .= err]