Skip to content

Commit

Permalink
feat(quickwit): externalize http client infos in order to be able to …
Browse files Browse the repository at this point in the history
…call some non-elastic endpoints + multiple fixes

Signed-off-by: Idriss Neumann <idriss.neumann@comwork.io>
  • Loading branch information
idrissneumann committed Mar 29, 2024
1 parent 49cc2ad commit f6a029f
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 15 deletions.
24 changes: 23 additions & 1 deletion elastalert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class ElasticSearchClient(Elasticsearch):
""" Extension of low level :class:`Elasticsearch` client with additional version resolving features """
""" Extension of low level :class:`Elasticsearch` client with additional features """

def __init__(self, conf):
"""
Expand All @@ -31,7 +31,9 @@ def __init__(self, conf):
client_cert=conf['client_cert'],
client_key=conf['client_key'])
self._conf = copy.copy(conf)
self._http_client_infos = util.http_client_infos(conf.get('es_host'), conf['es_port'], conf['use_ssl'], conf['headers'], conf['http_auth'])
self._es_version = None
self._es_distribution = None

@property
def conf(self):
Expand All @@ -40,6 +42,26 @@ def conf(self):
"""
return self._conf

@property
def http_client_infos(self):
"""
Returns the http client infos
Useful when the elastic client is not handling some endpoints
"""
return self._http_client_infos


@property
def es_distribution(self):
"""
Returns the reported distribution from the Elasticsearch server.
"""
if self._es_version is None:
(distribution, _) = util.get_version_from_cluster_info(self)
self._es_distribution = distribution if util.is_not_empty(distribution) else "elasticsearch"

return self._es_distribution

@property
def es_version(self):
"""
Expand Down
20 changes: 9 additions & 11 deletions elastalert/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@
from envparse import Env

from elastalert.auth import Auth
from elastalert.util import _quickwit_url_prefix, get_version_from_cluster_info, is_true, is_empty, is_response_ok
from elastalert.util import _quickwit_url_prefix, get_version_from_cluster_info, http_client_infos, is_true, is_empty, is_response_ok

env = Env(ES_USE_SSL=bool)

def create_quickwit_mappings(client_infos, recreate):
qw_index_mappings = read_qw_index_mappings()
for index_id in qw_index_mappings:
if not recreate:
r = requests.get("{}/api/v1/indexes/{}/describe".format(client_infos['url'], index_id), auth=client_infos['http_auth'], headers=client_infos['headers'])
r = requests.get("{}/api/v1/indexes/{}/describe".format(client_infos['url'], index_id), auth=client_infos['auth'], headers=client_infos['headers'])
if is_response_ok(r.status_code):
print('Index ' + index_id + ' already exists. Skipping index creation.')
continue

r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['http_auth'], headers=client_infos['headers'])
r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['auth'], headers=client_infos['headers'])
if is_response_ok(r.status_code):
print('Index ' + index_id + ' successfully created.')
else:
print('Index ' + index_id + ' not created because of error. Attempting to recreate index...')
requests.delete("{}/api/v1/indexes/{}".format(client_infos['url'], index_id), auth=client_infos['http_auth'], headers=client_infos['headers'])
r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['http_auth'], headers=client_infos['headers'])
requests.delete("{}/api/v1/indexes/{}".format(client_infos['url'], index_id), auth=client_infos['auth'], headers=client_infos['headers'])
r = requests.post("{}/api/v1/indexes".format(client_infos['url']), json=qw_index_mappings[index_id], auth=client_infos['auth'], headers=client_infos['headers'])
if is_response_ok(r.status_code):
print('Index ' + index_id + ' successfully created.')
else:
Expand Down Expand Up @@ -272,9 +272,11 @@ def main():
if api_key is not None:
headers.update({'Authorization': f'ApiKey {api_key}'})

if is_true(qw_enable) and not is_empty(url_prefix):
if is_true(qw_enable) and is_empty(url_prefix):
url_prefix=_quickwit_url_prefix

print("Open an Elasticsearch connection to host = {}, port = {}, url_prefix = {}, use_ssl = {}".format(host, port, url_prefix, use_ssl))

es = Elasticsearch(
host=host,
port=port,
Expand All @@ -290,11 +292,7 @@ def main():
ca_certs=ca_certs,
client_key=client_key)

client_infos = {
'url': "{}://{}:{}".format("https" if use_ssl else "http", host, port),
'headers': headers,
'http_auth': http_auth
}
client_infos = http_client_infos(host, port, use_ssl, headers, http_auth)
create_index_mappings(es_client=es, ea_index=index, client_infos=client_infos, recreate=args.recreate, old_ea_index=old_index)


Expand Down
23 changes: 20 additions & 3 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import argparse
import copy
import datetime
import json
import logging
import os
import random
Expand All @@ -13,6 +12,8 @@
import time
import timeit
import traceback
import requests

from email.mime.text import MIMEText
from smtplib import SMTP
from smtplib import SMTPException
Expand All @@ -38,7 +39,7 @@
from elastalert.prometheus_wrapper import PrometheusWrapper
from elastalert.ruletypes import FlatlineRule
from elastalert.util import (add_keyword_postfix, cronite_datetime_to_timestamp, dt_to_ts, dt_to_unix, EAException,
elastalert_logger, elasticsearch_client, format_index, lookup_es_key, parse_deadline,
elastalert_logger, elasticsearch_client, format_index, is_response_ok, lookup_es_key, parse_deadline,
parse_duration, pretty_ts, replace_dots_in_field_names, seconds, set_es_key,
should_scrolling_continue, total_seconds, ts_add, ts_now, ts_to_dt, unix_to_dt,
ts_utc_to_tz)
Expand Down Expand Up @@ -1474,6 +1475,18 @@ def get_opensearch_discover_external_url_formatter(self, rule):
formatter = create_opensearch_external_url_formatter(rule)
rule[key] = formatter
return formatter

def quickwit_ingest(self, index, body):
http_client_infos = self.writeback_es.http_client_infos
headers = http_client_infos['headers']
headers['Content-Type'] = "application/json"
r = requests.post("{}/api/v1/{}/ingest".format(http_client_infos['url'], index), data=body, headers=headers, auth=http_client_infos['auth'])
if not is_response_ok(r.status_code):
elastalert_logger.exception("Error ingesting alert info into Quickwit, status_code = {}".format(r.status_code))

# quickwit is asynchronous in any case
return {'_id': ''}


def writeback(self, doc_type, body, rule=None, match_body=None):
# ES 2.0 - 2.3 does not support dots in field names.
Expand All @@ -1496,7 +1509,11 @@ def writeback(self, doc_type, body, rule=None, match_body=None):

try:
index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type)
res = self.writeback_es.index(index=index, body=body)
if self.writeback_es.es_distribution != "quickwit":
res = self.writeback_es.index(index=index, body=body)
else:
res = self.quickwit_ingest(index=index, body=body)

return res
except ElasticsearchException as e:
elastalert_logger.exception("Error writing alert info to Elasticsearch: %s" % (e))
Expand Down
8 changes: 8 additions & 0 deletions elastalert/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,11 @@ def get_version_from_cluster_info(client):
time.sleep(3)

return (distribution, esversion)


def http_client_infos(host, port, use_ssl, headers, http_auth):
return {
'url': "{}://{}:{}".format("https" if use_ssl else "http", host, port),
'headers': headers if is_not_empty(headers) else {},
'auth': http_auth
}
28 changes: 28 additions & 0 deletions tests/elasticsearch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,34 @@ def test_es_version(es_client):
assert version == "1.2.3"


def test_es_distribution_notempty(es_client):
mockInfo = {}
versionData = {}
versionData['number'] = "1.2.3"
versionData['distribution'] = "quickwit"
mockInfo['version'] = versionData

with mock.patch('elasticsearch.client.Elasticsearch.info', new=MagicMock(return_value=mockInfo)):
distribution = es_client.es_distribution
assert distribution == "quickwit"


def test_es_distribution_empty(es_client):
mockInfo = {}
versionData = {}
versionData['number'] = "1.2.3"
mockInfo['version'] = versionData

with mock.patch('elasticsearch.client.Elasticsearch.info', new=MagicMock(return_value=mockInfo)):
distribution = es_client.es_distribution
assert distribution == "elasticsearch"


def test_es_http_client_info(es_client):
http_client_infos = es_client.http_client_infos
assert http_client_infos['url'] == "http://127.0.0.1:9200"


@pytest.mark.elasticsearch
class TestElasticsearch(object):
# TODO perform teardown removing data inserted into Elasticsearch
Expand Down
9 changes: 9 additions & 0 deletions tests/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from elastalert.util import is_not_empty
from elastalert.util import is_empty
from elastalert.util import is_response_ok
from elastalert.util import http_client_infos

from elasticsearch.client import Elasticsearch

Expand Down Expand Up @@ -752,3 +753,11 @@ def test_is_true(input, result):
])
def test_is_response_ok(code, result):
assert is_response_ok(code) == result


@pytest.mark.parametrize('use_ssl, expected', [
(True, 'https://host:9200'),
(False, 'http://host:9200')
])
def test_http_client_infos(use_ssl, expected):
assert http_client_infos('host', 9200, use_ssl, {}, None)['url'] == expected

0 comments on commit f6a029f

Please sign in to comment.