From f6a029fa90197f35f1f0b90133ae5eedf880a985 Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Fri, 29 Mar 2024 10:23:01 +0100 Subject: [PATCH] feat(quickwit): externalize http client infos in order to be able to call some non-elastic endpoints + multiple fixes Signed-off-by: Idriss Neumann --- elastalert/__init__.py | 24 +++++++++++++++++++++++- elastalert/create_index.py | 20 +++++++++----------- elastalert/elastalert.py | 23 ++++++++++++++++++++--- elastalert/util.py | 8 ++++++++ tests/elasticsearch_test.py | 28 ++++++++++++++++++++++++++++ tests/util_test.py | 9 +++++++++ 6 files changed, 97 insertions(+), 15 deletions(-) diff --git a/elastalert/__init__.py b/elastalert/__init__.py index 5873ca7b..b6efd981 100644 --- a/elastalert/__init__.py +++ b/elastalert/__init__.py @@ -9,7 +9,7 @@ class ElasticSearchClient(Elasticsearch): - """ Extension of low level :class:`Elasticsearch` client with additional version resolving features """ + """ Extension of low level :class:`Elasticsearch` client with additional features """ def __init__(self, conf): """ @@ -31,7 +31,9 @@ def __init__(self, conf): client_cert=conf['client_cert'], client_key=conf['client_key']) self._conf = copy.copy(conf) + self._http_client_infos = util.http_client_infos(conf.get('es_host'), conf['es_port'], conf['use_ssl'], conf['headers'], conf['http_auth']) self._es_version = None + self._es_distribution = None @property def conf(self): @@ -40,6 +42,26 @@ def conf(self): """ return self._conf + @property + def http_client_infos(self): + """ + Returns the http client infos + Useful when the elastic client is not handling some endpoints + """ + return self._http_client_infos + + + @property + def es_distribution(self): + """ + Returns the reported distribution from the Elasticsearch server. + """ + if self._es_version is None: + (distribution, _) = util.get_version_from_cluster_info(self) + self._es_distribution = distribution if util.is_not_empty(distribution) else "elasticsearch" + + return self._es_distribution + @property def es_version(self): """ diff --git a/elastalert/create_index.py b/elastalert/create_index.py index 9b109a34..5e1a18b7 100644 --- a/elastalert/create_index.py +++ b/elastalert/create_index.py @@ -17,7 +17,7 @@ from envparse import Env from elastalert.auth import Auth -from elastalert.util import _quickwit_url_prefix, get_version_from_cluster_info, is_true, is_empty, is_response_ok +from elastalert.util import _quickwit_url_prefix, get_version_from_cluster_info, http_client_infos, is_true, is_empty, is_response_ok env = Env(ES_USE_SSL=bool) @@ -25,18 +25,18 @@ def create_quickwit_mappings(client_infos, recreate): qw_index_mappings = read_qw_index_mappings() for index_id in qw_index_mappings: if not recreate: - r = requests.get("{}/api/v1/indexes/{}/describe".format(client_infos['url'], index_id), auth=client_infos['http_auth'], headers=client_infos['headers']) + r = requests.get("{}/api/v1/indexes/{}/describe".format(client_infos['url'], index_id), auth=client_infos['auth'], headers=client_infos['headers']) if is_response_ok(r.status_code): print('Index ' + index_id + ' already exists. Skipping index creation.') continue - r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['http_auth'], headers=client_infos['headers']) + r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['auth'], headers=client_infos['headers']) if is_response_ok(r.status_code): print('Index ' + index_id + ' successfully created.') else: print('Index ' + index_id + ' not created because of error. Attempting to recreate index...') - requests.delete("{}/api/v1/indexes/{}".format(client_infos['url'], index_id), auth=client_infos['http_auth'], headers=client_infos['headers']) - r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['http_auth'], headers=client_infos['headers']) + requests.delete("{}/api/v1/indexes/{}".format(client_infos['url'], index_id), auth=client_infos['auth'], headers=client_infos['headers']) + r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['auth'], headers=client_infos['headers']) if is_response_ok(r.status_code): print('Index ' + index_id + ' successfully created.') else: @@ -272,9 +272,11 @@ def main(): if api_key is not None: headers.update({'Authorization': f'ApiKey {api_key}'}) - if is_true(qw_enable) and not is_empty(url_prefix): + if is_true(qw_enable) and is_empty(url_prefix): url_prefix=_quickwit_url_prefix + print("Open an Elasticsearch connection to host = {}, port = {}, url_prefix = {}, use_ssl = {}".format(host, port, url_prefix, use_ssl)) + es = Elasticsearch( host=host, port=port, @@ -290,11 +292,7 @@ def main(): ca_certs=ca_certs, client_key=client_key) - client_infos = { - 'url': "{}://{}:{}".format("https" if use_ssl else "http", host, port), - 'headers': headers, - 'http_auth': http_auth - } + client_infos = http_client_infos(host, port, use_ssl, headers, http_auth) create_index_mappings(es_client=es, ea_index=index, client_infos=client_infos, recreate=args.recreate, old_ea_index=old_index) diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index 4e5d10ad..7c142e42 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -2,7 +2,6 @@ import argparse import copy import datetime -import json import logging import os import random @@ -13,6 +12,8 @@ import time import timeit import traceback +import requests + from email.mime.text import MIMEText from smtplib import SMTP from smtplib import SMTPException @@ -38,7 +39,7 @@ from elastalert.prometheus_wrapper import PrometheusWrapper from elastalert.ruletypes import FlatlineRule from elastalert.util import (add_keyword_postfix, cronite_datetime_to_timestamp, dt_to_ts, dt_to_unix, EAException, - elastalert_logger, elasticsearch_client, format_index, lookup_es_key, parse_deadline, + elastalert_logger, elasticsearch_client, format_index, is_response_ok, lookup_es_key, parse_deadline, parse_duration, pretty_ts, replace_dots_in_field_names, seconds, set_es_key, should_scrolling_continue, total_seconds, ts_add, ts_now, ts_to_dt, unix_to_dt, ts_utc_to_tz) @@ -1474,6 +1475,18 @@ def get_opensearch_discover_external_url_formatter(self, rule): formatter = create_opensearch_external_url_formatter(rule) rule[key] = formatter return formatter + + def quickwit_ingest(self, index, body): + http_client_infos = self.writeback_es.http_client_infos + headers = http_client_infos['headers'] + headers['Content-Type'] = "application/json" + r = requests.post("{}/api/v1/{}/ingest".format(http_client_infos['url'], index), data=body, headers=headers, auth=http_client_infos['auth']) + if not is_response_ok(r.status_code): + elastalert_logger.exception("Error ingesting alert info into Quickwit, status_code = {}".format(r.status_code)) + + # quickwit is asynchronous in any case + return {'_id': ''} + def writeback(self, doc_type, body, rule=None, match_body=None): # ES 2.0 - 2.3 does not support dots in field names. @@ -1496,7 +1509,11 @@ def writeback(self, doc_type, body, rule=None, match_body=None): try: index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type) - res = self.writeback_es.index(index=index, body=body) + if self.writeback_es.es_distribution != "quickwit": + res = self.writeback_es.index(index=index, body=body) + else: + res = self.quickwit_ingest(index=index, body=body) + return res except ElasticsearchException as e: elastalert_logger.exception("Error writing alert info to Elasticsearch: %s" % (e)) diff --git a/elastalert/util.py b/elastalert/util.py index 9aa5cd7a..311786af 100644 --- a/elastalert/util.py +++ b/elastalert/util.py @@ -670,3 +670,11 @@ def get_version_from_cluster_info(client): time.sleep(3) return (distribution, esversion) + + +def http_client_infos(host, port, use_ssl, headers, http_auth): + return { + 'url': "{}://{}:{}".format("https" if use_ssl else "http", host, port), + 'headers': headers if is_not_empty(headers) else {}, + 'auth': http_auth + } diff --git a/tests/elasticsearch_test.py b/tests/elasticsearch_test.py index a97b50bf..0dbf6adc 100644 --- a/tests/elasticsearch_test.py +++ b/tests/elasticsearch_test.py @@ -39,6 +39,34 @@ def test_es_version(es_client): assert version == "1.2.3" +def test_es_distribution_notempty(es_client): + mockInfo = {} + versionData = {} + versionData['number'] = "1.2.3" + versionData['distribution'] = "quickwit" + mockInfo['version'] = versionData + + with mock.patch('elasticsearch.client.Elasticsearch.info', new=MagicMock(return_value=mockInfo)): + distribution = es_client.es_distribution + assert distribution == "quickwit" + + +def test_es_distribution_empty(es_client): + mockInfo = {} + versionData = {} + versionData['number'] = "1.2.3" + mockInfo['version'] = versionData + + with mock.patch('elasticsearch.client.Elasticsearch.info', new=MagicMock(return_value=mockInfo)): + distribution = es_client.es_distribution + assert distribution == "elasticsearch" + + +def test_es_http_client_info(es_client): + http_client_infos = es_client.http_client_infos + assert http_client_infos['url'] == "http://127.0.0.1:9200" + + @pytest.mark.elasticsearch class TestElasticsearch(object): # TODO perform teardown removing data inserted into Elasticsearch diff --git a/tests/util_test.py b/tests/util_test.py index 298d9898..013d6558 100644 --- a/tests/util_test.py +++ b/tests/util_test.py @@ -44,6 +44,7 @@ from elastalert.util import is_not_empty from elastalert.util import is_empty from elastalert.util import is_response_ok +from elastalert.util import http_client_infos from elasticsearch.client import Elasticsearch @@ -752,3 +753,11 @@ def test_is_true(input, result): ]) def test_is_response_ok(code, result): assert is_response_ok(code) == result + + +@pytest.mark.parametrize('use_ssl, expected', [ + (True, 'https://host:9200'), + (False, 'http://host:9200') +]) +def test_http_client_infos(use_ssl, expected): + assert http_client_infos('host', 9200, use_ssl, {}, None)['url'] == expected