From bc96728a74c7acf95796af7ba91055df5e669a53 Mon Sep 17 00:00:00 2001 From: Janosch <99879757+jkppr@users.noreply.github.com> Date: Wed, 22 Jan 2025 10:28:35 +0100 Subject: [PATCH] Increase OpenSearch mapping limit dynamically during indexing of csv/jsonl data (#3257) * Dynamically increase OpenSearch field mapping limit during indexing of csv/jsonl * Adding e2e test for gracefully error out if too many unique fields used * Adding metrics to the run_csv_jsonl function --- data/timesketch.conf | 10 ++- end_to_end_tests/upload_test.py | 56 +++++++++++++ timesketch/lib/tasks.py | 141 +++++++++++++++++++++++++++++++- 3 files changed, 202 insertions(+), 5 deletions(-) diff --git a/data/timesketch.conf b/data/timesketch.conf index dad4a5263e..7a03e64093 100644 --- a/data/timesketch.conf +++ b/data/timesketch.conf @@ -21,11 +21,11 @@ SECRET_KEY = '' # production. SQLALCHEMY_DATABASE_URI = 'postgresql://:@localhost/timesketch' -# Configure where your Elasticsearch server is located. +# Configure where your OpenSearch server is located. # # Make sure that the OpenSearch server is properly secured and not accessible # from the internet. See the following link for more information: -# http://www.elasticsearch.org/blog/scripting-security/ +# https://opensearch.org/docs/latest/getting-started/security/ OPENSEARCH_HOST = '127.0.0.1' OPENSEARCH_PORT = 9200 OPENSEARCH_USER = None @@ -34,6 +34,10 @@ OPENSEARCH_SSL = False OPENSEARCH_VERIFY_CERTS = True OPENSEARCH_TIMEOUT = 10 OPENSEARCH_FLUSH_INTERVAL = 5000 +# Be careful when increasing the upper limit since this will impact your +# OpenSearch clusters performance and storage requirements! +OPENSEARCH_MAPPING_BUFFER = 0.1 +OPENSEARCH_MAPPING_UPPER_LIMIT = 1000 # Define what labels should be defined that make it so that a sketch and # timelines will not be deleted. This can be used to add a list of different @@ -370,7 +374,7 @@ LLM_PROVIDER_CONFIGS = { # To use Google's AI Studio simply obtain an API key from https://aistudio.google.com/ # pip install google-generativeai 'aistudio': { - 'api_key': '', + 'api_key': '', 'model': 'gemini-2.0-flash-exp', }, } diff --git a/end_to_end_tests/upload_test.py b/end_to_end_tests/upload_test.py index 8f6a914c29..6a7b035415 100755 --- a/end_to_end_tests/upload_test.py +++ b/end_to_end_tests/upload_test.py @@ -14,6 +14,7 @@ """End to end tests of Timesketch upload functionality.""" import os import random +import json from timesketch_api_client import search from . import interface @@ -86,6 +87,61 @@ def test_large_upload_jsonl(self): events = sketch.explore("data_type:foobarjson", as_pandas=True) self.assertions.assertEqual(len(events), 4123) + def test_upload_jsonl_mapping_exceeds_limit(self): + """Test uploading a timeline with a jsonl events that exceeds the + default field mapping limit of 1000. The test will create a temporary + file with events that have many unique keys and then upload the file to + Timesketch. The test will then check that the correct error is triggered. + """ + + # create a new sketch + rand = random.randint(0, 10000) + sketch = self.api.create_sketch( + name=f"test_upload_jsonl_mapping_exceeds_limit {rand}" + ) + self.sketch = sketch + + file_path = "/tmp/mapping_over_1k.jsonl" + num_lines = 100 + num_keys_per_line = 6 + all_keys = set() + + with open(file_path, "w", encoding="utf-8") as file_object: + for i in range(num_lines): + line_data = { + "datetime": "2015-07-24T19:01:01+00:00", + "message": f"Event {i} of {num_lines}", + "timestamp_desc": "test", + "data_type": "test:jsonl", + } + for j in range(num_keys_per_line): + key = f"field_name_{j}_{random.randint(0, 100000)}" + while key in all_keys: # Avoid duplicate keys + key = f"field_name_{random.randint(0, 100000)}" + all_keys.add(key) + line_data[key] = f"value_{j}_{random.randint(0, 10000)}" + + json.dump(line_data, file_object) + file_object.write("\n") + + try: + self.import_timeline(file_path, index_name=rand, sketch=sketch) + except RuntimeError: + print( + "Timeline import failing is expected. Checking for the correct " + "error message..." + ) + os.remove(file_path) + + timeline = sketch.list_timelines()[0] + # check that timeline threw the correct error + self.assertions.assertEqual(timeline.name, file_path) + self.assertions.assertEqual(timeline.index.name, str(rand)) + self.assertions.assertEqual(timeline.index.status, "fail") + self.assertions.assertIn( + "OPENSEARCH_MAPPING_UPPER_LIMIT", timeline.data_sources[0]["error_message"] + ) + def test_very_large_upload_jsonl(self): """Test uploading a timeline with over 50 k events as jsonl. The test will create a temporary file and then diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 0581a66f4d..953c315bb5 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -22,9 +22,11 @@ import logging import os import subprocess +import time import traceback import six import yaml +import prometheus_client from celery import chain from celery import group @@ -40,6 +42,7 @@ from timesketch.lib.analyzers import manager from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager from timesketch.lib.datastores.opensearch import OpenSearchDataStore +from timesketch.lib.definitions import METRICS_NAMESPACE from timesketch.lib.utils import read_and_validate_csv from timesketch.lib.utils import read_and_validate_jsonl from timesketch.lib.utils import send_email @@ -54,6 +57,51 @@ from timesketch.models.user import User +# Metrics definitions +METRICS = { + "worker_csv_jsonl_runs": prometheus_client.Counter( + "worker_csv_jsonl_runs", + "Number of times the run_csv_jsonl task has been run", + namespace=METRICS_NAMESPACE, + ), + "worker_mapping_increase": prometheus_client.Gauge( + "worker_mapping_increase", + "Number of times a mapping increase is requested", + ["index_name", "timeline_id"], + namespace=METRICS_NAMESPACE, + ), + "worker_mapping_increase_limit_exceeded": prometheus_client.Counter( + "worker_mapping_increase_limit_exceeded", + "Number of times the OpenSearch mapping increase ran into the upper limit", + ["index_name"], + namespace=METRICS_NAMESPACE, + ), + "worker_files_parsed": prometheus_client.Counter( + "worker_files_parsed", + "Number of files parsed by the worker task", + ["source_type"], + namespace=METRICS_NAMESPACE, + ), + "worker_events_added": prometheus_client.Gauge( + "worker_events_added", + "Number of events added by the worker parsing task", + ["index_name", "timeline_id", "source_type"], + namespace=METRICS_NAMESPACE, + ), + "worker_import_errors": prometheus_client.Counter( + "worker_import_errors", + "Number of errors during the import", + ["index_name", "error_type"], + namespace=METRICS_NAMESPACE, + ), + "worker_run_time": prometheus_client.Summary( + "worker_run_time_seconds", + "Runtime of the worker task in seconds", + ["index_name", "timeline_id", "source_type"], + namespace=METRICS_NAMESPACE, + ), +} + # To be able to determine plaso's version. try: import plaso @@ -100,6 +148,10 @@ def get_import_errors(error_container, index_name, total_count): if error_types: top_type = error_types.most_common()[0][0] + for error_type in error_types: + METRICS["worker_import_errors"].labels( + index_name=index_name, error_type=error_type + ).inc(error_types.get(error_type, 0)) else: top_type = "Unknown Reasons" @@ -671,6 +723,7 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin Returns: Name (str) of the index. """ + time_start = time.time() if not plaso: raise RuntimeError( ("Plaso isn't installed, " "unable to continue processing plaso files.") @@ -826,6 +879,10 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin # Mark the searchindex and timelines as ready _set_datasource_status(timeline_id, file_path, "ready") + time_took_to_run = time.time() - time_start + METRICS["worker_run_time"].labels( + index_name=index_name, timeline_id=timeline_id, source_type=source_type + ).observe(time_took_to_run) return index_name @@ -857,11 +914,15 @@ def run_csv_jsonl( Returns: Name (str) of the index. """ + METRICS["worker_csv_jsonl_runs"].inc() + time_start = time.time() + if events: file_handle = io.StringIO(events) source_type = "jsonl" else: file_handle = codecs.open(file_path, "r", encoding="utf-8", errors="replace") + METRICS["worker_files_parsed"].labels(source_type=source_type).inc() validators = { "csv": read_and_validate_csv, @@ -919,13 +980,77 @@ def run_csv_jsonl( final_counter = 0 error_msg = "" error_count = 0 + unique_keys = set() + limit_buffer_percentage = float( + current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.1) + ) + upper_mapping_limit = int( + current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 1000) + ) + try: opensearch.create_index(index_name=index_name, mappings=mappings) + + current_index_mapping_properties = ( + opensearch.client.indices.get_mapping(index=index_name) + .get(index_name, {}) + .get("mappings", {}) + .get("properties", {}) + ) + unique_keys = set(current_index_mapping_properties) + + try: + current_limit = int( + opensearch.client.indices.get_settings(index=index_name)[index_name][ + "settings" + ]["index"]["mapping"]["total_fields"]["limit"] + ) + except KeyError: + current_limit = 1000 + for event in read_and_validate( file_handle=file_handle, headers_mapping=headers_mapping, delimiter=delimiter, ): + unique_keys.update(event.keys()) + # Calculating the new limit. Each unique key is counted twice due to + # the "keyword" type plus a percentage buffer (default 10%). + new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage)) + # To prevent mapping explosions we still check against an upper + # mapping limit set in timesketch.conf (default: 1000). + if new_limit > upper_mapping_limit: + METRICS["worker_mapping_increase_limit_exceeded"].labels( + index_name=index_name + ).inc() + error_msg = ( + f"Error: Indexing timeline [{timeline_name}] into [{index_name}] " + f"exceeds the upper field mapping limit of {upper_mapping_limit}. " + f"New calculated mapping limit: {new_limit}. Review your " + "import data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT." + ) + logger.error(error_msg) + _set_datasource_status( + timeline_id, file_path, "fail", error_message=str(error_msg) + ) + return None + + if new_limit > current_limit and current_limit < upper_mapping_limit: + new_limit = min(new_limit, upper_mapping_limit) + opensearch.client.indices.put_settings( + index=index_name, + body={"index.mapping.total_fields.limit": new_limit}, + ) + METRICS["worker_mapping_increase"].labels( + index_name=index_name, timeline_id=timeline_id + ).set(new_limit) + logger.info( + "OpenSearch index [%s] mapping limit increased to: %d", + index_name, + new_limit, + ) + current_limit = new_limit + opensearch.import_event(index_name, event, timeline_id=timeline_id) final_counter += 1 @@ -933,6 +1058,7 @@ def run_csv_jsonl( results = opensearch.flush_queued_events() error_container = results.get("error_container", {}) + error_count = len(error_container.get(index_name, {}).get("errors", [])) error_msg = get_import_errors( error_container=error_container, index_name=index_name, @@ -950,10 +1076,15 @@ def run_csv_jsonl( except Exception as e: # pylint: disable=broad-except # Mark the searchindex and timelines as failed and exit the task error_msg = traceback.format_exc() - _set_datasource_status(timeline_id, file_path, "fail", error_message=error_msg) + _set_datasource_status( + timeline_id, file_path, "fail", error_message=str(error_msg) + ) logger.error("Error: {0!s}\n{1:s}".format(e, error_msg)) return None + METRICS["worker_events_added"].labels( + index_name=index_name, timeline_id=timeline_id, source_type=source_type + ).set(final_counter) if error_count: logger.info( "Index timeline: [{0:s}] to index [{1:s}] - {2:d} out of {3:d} " @@ -972,8 +1103,14 @@ def run_csv_jsonl( ) # Set status to ready when done - _set_datasource_status(timeline_id, file_path, "ready", error_message=error_msg) + _set_datasource_status( + timeline_id, file_path, "ready", error_message=str(error_msg) + ) + time_took_to_run = time.time() - time_start + METRICS["worker_run_time"].labels( + index_name=index_name, timeline_id=timeline_id, source_type=source_type + ).observe(time_took_to_run) return index_name