From 702255fe5369e17fde2bee33a7a0e994314051f1 Mon Sep 17 00:00:00 2001 From: janosch Date: Wed, 8 Jan 2025 13:19:25 +0000 Subject: [PATCH 01/10] Dynamically increase OpenSearch mapping limit during indexing of csv/jsonl --- data/timesketch.conf | 8 ++++-- timesketch/lib/tasks.py | 58 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/data/timesketch.conf b/data/timesketch.conf index fb25a1732b..9e0da16315 100644 --- a/data/timesketch.conf +++ b/data/timesketch.conf @@ -21,11 +21,11 @@ SECRET_KEY = '' # production. SQLALCHEMY_DATABASE_URI = 'postgresql://:@localhost/timesketch' -# Configure where your Elasticsearch server is located. +# Configure where your OpenSearch server is located. # # Make sure that the OpenSearch server is properly secured and not accessible # from the internet. See the following link for more information: -# http://www.elasticsearch.org/blog/scripting-security/ +# https://opensearch.org/docs/latest/getting-started/security/ OPENSEARCH_HOST = '127.0.0.1' OPENSEARCH_PORT = 9200 OPENSEARCH_USER = None @@ -34,6 +34,10 @@ OPENSEARCH_SSL = False OPENSEARCH_VERIFY_CERTS = True OPENSEARCH_TIMEOUT = 10 OPENSEARCH_FLUSH_INTERVAL = 5000 +# Be careful when increasing the upper limit since this will impact your +# OpenSearch clusters performance and storage requirements! +OPENSEARCH_MAPPING_BUFFER = 0.2 +OPENSEARCH_MAPPING_UPPER_LIMIT = 2000 # Define what labels should be defined that make it so that a sketch and # timelines will not be deleted. This can be used to add a list of different diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 0581a66f4d..a790542353 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -919,13 +919,66 @@ def run_csv_jsonl( final_counter = 0 error_msg = "" error_count = 0 + unique_keys = set() + limit_buffer_percentage = float( + current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.2) + ) + upper_mapping_limit = int( + current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 2000) + ) + try: opensearch.create_index(index_name=index_name, mappings=mappings) + + current_index_mapping_properties = ( + opensearch.client.indices.get_mapping(index=index_name) + .get(index_name, {}) + .get("mappings", {}) + .get("properties", {}) + ) + unique_keys = set(current_index_mapping_properties) + + try: + current_limit = int(opensearch.client.indices.get_settings(index=index_name)[ + index_name + ]["settings"]["index"]["mapping"]["total_fields"]["limit"]) + except KeyError: + current_limit = 1000 + for event in read_and_validate( file_handle=file_handle, headers_mapping=headers_mapping, delimiter=delimiter, ): + unique_keys.update(event.keys()) + # Calculating the new limit. Each unique key is counted twice due to + # the "keayword" type plus a percentage buffer (default 20%). + new_limit = int((len(unique_keys)*2) * (1 + limit_buffer_percentage)) + # To prevent mapping explosions we still check against an upper + # mapping limit set in timesketch.conf (default: 2000). + if new_limit > upper_mapping_limit: + error_msg = ( + f"Error: Indexing timeline [{timeline_name}] into [{index_name}] " + f"exceeds the upper field mapping limit of {upper_mapping_limit}. " + f"Currently mapped fields: ~{len(unique_keys)*2} / New " + f"calculated mapping limit: {new_limit}. Review your import " + "data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT." + ) + logger.error(error_msg) + _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) + return None + + if new_limit > current_limit: + opensearch.client.indices.put_settings( + index=index_name, body={"index.mapping.total_fields.limit": new_limit} + ) + logger.info( + "OpenSearch index [%s] mapping limit increased to: %d", + index_name, + new_limit, + ) + current_limit = new_limit + opensearch.import_event(index_name, event, timeline_id=timeline_id) final_counter += 1 @@ -933,6 +986,7 @@ def run_csv_jsonl( results = opensearch.flush_queued_events() error_container = results.get("error_container", {}) + error_count = len(error_container.get(index_name, {}).get('errors', [])) error_msg = get_import_errors( error_container=error_container, index_name=index_name, @@ -950,7 +1004,7 @@ def run_csv_jsonl( except Exception as e: # pylint: disable=broad-except # Mark the searchindex and timelines as failed and exit the task error_msg = traceback.format_exc() - _set_datasource_status(timeline_id, file_path, "fail", error_message=error_msg) + _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) logger.error("Error: {0!s}\n{1:s}".format(e, error_msg)) return None @@ -972,7 +1026,7 @@ def run_csv_jsonl( ) # Set status to ready when done - _set_datasource_status(timeline_id, file_path, "ready", error_message=error_msg) + _set_datasource_status(timeline_id, file_path, "ready", error_message=str(error_msg)) return index_name From 62b6059845858079e50349a99b94614645300fa2 Mon Sep 17 00:00:00 2001 From: janosch Date: Wed, 8 Jan 2025 13:30:21 +0000 Subject: [PATCH 02/10] formatter --- timesketch/lib/tasks.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index a790542353..3b7eaf4c7f 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -939,9 +939,11 @@ def run_csv_jsonl( unique_keys = set(current_index_mapping_properties) try: - current_limit = int(opensearch.client.indices.get_settings(index=index_name)[ - index_name - ]["settings"]["index"]["mapping"]["total_fields"]["limit"]) + current_limit = int( + opensearch.client.indices.get_settings(index=index_name)[index_name][ + "settings" + ]["index"]["mapping"]["total_fields"]["limit"] + ) except KeyError: current_limit = 1000 @@ -953,7 +955,7 @@ def run_csv_jsonl( unique_keys.update(event.keys()) # Calculating the new limit. Each unique key is counted twice due to # the "keayword" type plus a percentage buffer (default 20%). - new_limit = int((len(unique_keys)*2) * (1 + limit_buffer_percentage)) + new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage)) # To prevent mapping explosions we still check against an upper # mapping limit set in timesketch.conf (default: 2000). if new_limit > upper_mapping_limit: @@ -965,12 +967,15 @@ def run_csv_jsonl( "data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT." ) logger.error(error_msg) - _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) + _set_datasource_status( + timeline_id, file_path, "fail", error_message=str(error_msg) + ) return None if new_limit > current_limit: opensearch.client.indices.put_settings( - index=index_name, body={"index.mapping.total_fields.limit": new_limit} + index=index_name, + body={"index.mapping.total_fields.limit": new_limit}, ) logger.info( "OpenSearch index [%s] mapping limit increased to: %d", @@ -986,7 +991,7 @@ def run_csv_jsonl( results = opensearch.flush_queued_events() error_container = results.get("error_container", {}) - error_count = len(error_container.get(index_name, {}).get('errors', [])) + error_count = len(error_container.get(index_name, {}).get("errors", [])) error_msg = get_import_errors( error_container=error_container, index_name=index_name, @@ -1004,7 +1009,9 @@ def run_csv_jsonl( except Exception as e: # pylint: disable=broad-except # Mark the searchindex and timelines as failed and exit the task error_msg = traceback.format_exc() - _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) + _set_datasource_status( + timeline_id, file_path, "fail", error_message=str(error_msg) + ) logger.error("Error: {0!s}\n{1:s}".format(e, error_msg)) return None @@ -1026,7 +1033,9 @@ def run_csv_jsonl( ) # Set status to ready when done - _set_datasource_status(timeline_id, file_path, "ready", error_message=str(error_msg)) + _set_datasource_status( + timeline_id, file_path, "ready", error_message=str(error_msg) + ) return index_name From 0619493207e57c8c70fb7656902c03d63a085ed5 Mon Sep 17 00:00:00 2001 From: Alexander J <741037+jaegeral@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:06:20 +0100 Subject: [PATCH 03/10] Update tasks.py --- timesketch/lib/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 3b7eaf4c7f..3ddfda881e 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -954,7 +954,7 @@ def run_csv_jsonl( ): unique_keys.update(event.keys()) # Calculating the new limit. Each unique key is counted twice due to - # the "keayword" type plus a percentage buffer (default 20%). + # the "keyword" type plus a percentage buffer (default 20%). new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage)) # To prevent mapping explosions we still check against an upper # mapping limit set in timesketch.conf (default: 2000). From d1980a32a23c4b1d24bf2517f88711146591afda Mon Sep 17 00:00:00 2001 From: janosch Date: Fri, 17 Jan 2025 09:10:13 +0000 Subject: [PATCH 04/10] adding metrics to tasks.run_csv_jsonl() --- timesketch/lib/tasks.py | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 3b7eaf4c7f..bfa7751676 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -25,6 +25,7 @@ import traceback import six import yaml +import time from celery import chain from celery import group @@ -53,6 +54,58 @@ from timesketch.models.sketch import InvestigativeQuestionConclusion from timesketch.models.user import User +import prometheus_client +from timesketch.lib.definitions import METRICS_NAMESPACE + +# Metrics definitions +METRICS = { + "csv_jsonl_runs": prometheus_client.Counter( + "csv_jsonl_runs", + "Number of times the run_csv_jsonl task has been run", + namespace=METRICS_NAMESPACE, + ), + "manual_event_added": prometheus_client.Counter( + "manual_event_added", + "Number of times the run_csv_jsonl used to add a manual event", + namespace=METRICS_NAMESPACE, + ), + "mapping_increase": prometheus_client.Gauge( + "mapping_increase", + "Number of times a mapping increase is requested", + ["index_name", "timeline_id"], + namespace=METRICS_NAMESPACE, + ), + "mapping_increase_limit_exceeded": prometheus_client.Counter( + "mapping_increase_limit_exceeded", + "Number of times the OpenSearch mapping increase ran into the upper limit", + ["index_name"], + namespace=METRICS_NAMESPACE, + ), + "files_parsed": prometheus_client.Counter( + "files_parsed", + "Number of files parsed by the csv_jsonl task", + ["file_extension"], + namespace=METRICS_NAMESPACE, + ), + "lines_parsed": prometheus_client.Counter( + "lines_parsed", + "Number of lines parsed by the csv_jsonl task", + ["index_name", "timeline_id"], + namespace=METRICS_NAMESPACE, + ), + "import_errors": prometheus_client.Counter( + "import_errors", + "Number of errors during the import", + ["index_name", "timeline_id"], + namespace=METRICS_NAMESPACE, + ), + "total_run_time": prometheus_client.Summary( + "total_run_time", + "Total runtime of the run_csv_jsonl task", + ["index_name", "timeline_id", "file_extension"], + namespace=METRICS_NAMESPACE, + ), +} # To be able to determine plaso's version. try: @@ -857,11 +910,16 @@ def run_csv_jsonl( Returns: Name (str) of the index. """ + METRICS["csv_jsonl_runs"].inc() + time_start = time.time() + if events: file_handle = io.StringIO(events) source_type = "jsonl" + METRICS["manual_event_added"].inc() else: file_handle = codecs.open(file_path, "r", encoding="utf-8", errors="replace") + METRICS["files_parsed"].labels(file_extension=source_type).inc() validators = { "csv": read_and_validate_csv, @@ -959,6 +1017,7 @@ def run_csv_jsonl( # To prevent mapping explosions we still check against an upper # mapping limit set in timesketch.conf (default: 2000). if new_limit > upper_mapping_limit: + METRICS["mapping_increase_limit_exceeded"].labels(index_name=index_name).inc() error_msg = ( f"Error: Indexing timeline [{timeline_name}] into [{index_name}] " f"exceeds the upper field mapping limit of {upper_mapping_limit}. " @@ -977,6 +1036,7 @@ def run_csv_jsonl( index=index_name, body={"index.mapping.total_fields.limit": new_limit}, ) + METRICS["mapping_increase"].labels(index_name=index_name, timeline_id=timeline_id).set(new_limit) logger.info( "OpenSearch index [%s] mapping limit increased to: %d", index_name, @@ -986,6 +1046,7 @@ def run_csv_jsonl( opensearch.import_event(index_name, event, timeline_id=timeline_id) final_counter += 1 + METRICS["lines_parsed"].labels(index_name=index_name, timeline_id=timeline_id).inc() # Import the remaining events results = opensearch.flush_queued_events() @@ -1036,7 +1097,12 @@ def run_csv_jsonl( _set_datasource_status( timeline_id, file_path, "ready", error_message=str(error_msg) ) + METRICS["import_errors"].labels(index_name=index_name, timeline_id=timeline_id).inc(error_count) + time_took_to_run = time.time() - time_start + METRICS["total_run_time"].labels(index_name=index_name, timeline_id=timeline_id,file_extension=source_type).observe( + time_took_to_run + ) return index_name From e56e900c965008a36dccf971e78b6daeeba19486 Mon Sep 17 00:00:00 2001 From: janosch Date: Fri, 17 Jan 2025 13:13:00 +0000 Subject: [PATCH 05/10] Adding metrics for run_csv_jsonl and error handling --- timesketch/lib/tasks.py | 73 +++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index bfa7751676..fd585d5e05 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -59,50 +59,45 @@ # Metrics definitions METRICS = { - "csv_jsonl_runs": prometheus_client.Counter( - "csv_jsonl_runs", + "worker_csv_jsonl_runs": prometheus_client.Counter( + "worker_csv_jsonl_runs", "Number of times the run_csv_jsonl task has been run", namespace=METRICS_NAMESPACE, ), - "manual_event_added": prometheus_client.Counter( - "manual_event_added", - "Number of times the run_csv_jsonl used to add a manual event", - namespace=METRICS_NAMESPACE, - ), - "mapping_increase": prometheus_client.Gauge( - "mapping_increase", + "worker_mapping_increase": prometheus_client.Gauge( + "worker_mapping_increase", "Number of times a mapping increase is requested", ["index_name", "timeline_id"], namespace=METRICS_NAMESPACE, ), - "mapping_increase_limit_exceeded": prometheus_client.Counter( - "mapping_increase_limit_exceeded", + "worker_mapping_increase_limit_exceeded": prometheus_client.Counter( + "worker_mapping_increase_limit_exceeded", "Number of times the OpenSearch mapping increase ran into the upper limit", ["index_name"], namespace=METRICS_NAMESPACE, ), - "files_parsed": prometheus_client.Counter( - "files_parsed", - "Number of files parsed by the csv_jsonl task", - ["file_extension"], + "worker_files_parsed": prometheus_client.Counter( + "worker_files_parsed", + "Number of files parsed by the worker task", + ["source_type"], namespace=METRICS_NAMESPACE, ), - "lines_parsed": prometheus_client.Counter( - "lines_parsed", - "Number of lines parsed by the csv_jsonl task", - ["index_name", "timeline_id"], + "worker_events_added": prometheus_client.Gauge( + "worker_events_added", + "Number of events added by the worker parsing task", + ["index_name", "timeline_id", "source_type"], namespace=METRICS_NAMESPACE, ), - "import_errors": prometheus_client.Counter( - "import_errors", + "worker_import_errors": prometheus_client.Counter( + "worker_import_errors", "Number of errors during the import", - ["index_name", "timeline_id"], + ["index_name", "error_type"], namespace=METRICS_NAMESPACE, ), - "total_run_time": prometheus_client.Summary( - "total_run_time", + "worker_total_run_time": prometheus_client.Summary( + "worker_total_run_time", "Total runtime of the run_csv_jsonl task", - ["index_name", "timeline_id", "file_extension"], + ["index_name", "timeline_id", "source_type"], namespace=METRICS_NAMESPACE, ), } @@ -153,6 +148,10 @@ def get_import_errors(error_container, index_name, total_count): if error_types: top_type = error_types.most_common()[0][0] + for error_type in error_types: + METRICS["worker_import_errors"].labels( + index_name=index_name, error_type=error_type + ).inc(error_types.get(error_type, 0)) else: top_type = "Unknown Reasons" @@ -910,16 +909,15 @@ def run_csv_jsonl( Returns: Name (str) of the index. """ - METRICS["csv_jsonl_runs"].inc() + METRICS["worker_csv_jsonl_runs"].inc() time_start = time.time() if events: file_handle = io.StringIO(events) source_type = "jsonl" - METRICS["manual_event_added"].inc() else: file_handle = codecs.open(file_path, "r", encoding="utf-8", errors="replace") - METRICS["files_parsed"].labels(file_extension=source_type).inc() + METRICS["worker_files_parsed"].labels(source_type=source_type).inc() validators = { "csv": read_and_validate_csv, @@ -1017,7 +1015,9 @@ def run_csv_jsonl( # To prevent mapping explosions we still check against an upper # mapping limit set in timesketch.conf (default: 2000). if new_limit > upper_mapping_limit: - METRICS["mapping_increase_limit_exceeded"].labels(index_name=index_name).inc() + METRICS["worker_mapping_increase_limit_exceeded"].labels( + index_name=index_name + ).inc() error_msg = ( f"Error: Indexing timeline [{timeline_name}] into [{index_name}] " f"exceeds the upper field mapping limit of {upper_mapping_limit}. " @@ -1036,7 +1036,9 @@ def run_csv_jsonl( index=index_name, body={"index.mapping.total_fields.limit": new_limit}, ) - METRICS["mapping_increase"].labels(index_name=index_name, timeline_id=timeline_id).set(new_limit) + METRICS["worker_mapping_increase"].labels( + index_name=index_name, timeline_id=timeline_id + ).set(new_limit) logger.info( "OpenSearch index [%s] mapping limit increased to: %d", index_name, @@ -1046,7 +1048,6 @@ def run_csv_jsonl( opensearch.import_event(index_name, event, timeline_id=timeline_id) final_counter += 1 - METRICS["lines_parsed"].labels(index_name=index_name, timeline_id=timeline_id).inc() # Import the remaining events results = opensearch.flush_queued_events() @@ -1076,6 +1077,9 @@ def run_csv_jsonl( logger.error("Error: {0!s}\n{1:s}".format(e, error_msg)) return None + METRICS["worker_events_added"].labels( + index_name=index_name, timeline_id=timeline_id, source_type=source_type + ).set(final_counter) if error_count: logger.info( "Index timeline: [{0:s}] to index [{1:s}] - {2:d} out of {3:d} " @@ -1097,12 +1101,11 @@ def run_csv_jsonl( _set_datasource_status( timeline_id, file_path, "ready", error_message=str(error_msg) ) - METRICS["import_errors"].labels(index_name=index_name, timeline_id=timeline_id).inc(error_count) time_took_to_run = time.time() - time_start - METRICS["total_run_time"].labels(index_name=index_name, timeline_id=timeline_id,file_extension=source_type).observe( - time_took_to_run - ) + METRICS["worker_total_run_time"].labels( + index_name=index_name, timeline_id=timeline_id, source_type=source_type + ).observe(time_took_to_run) return index_name From 1e977ff54bd96a17dff04e6a1837f753b963c1f2 Mon Sep 17 00:00:00 2001 From: janosch Date: Fri, 17 Jan 2025 13:25:36 +0000 Subject: [PATCH 06/10] linter --- timesketch/lib/tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 4e8340d407..68443dd255 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -22,10 +22,11 @@ import logging import os import subprocess +import time import traceback import six import yaml -import time +import prometheus_client from celery import chain from celery import group @@ -41,6 +42,7 @@ from timesketch.lib.analyzers import manager from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager from timesketch.lib.datastores.opensearch import OpenSearchDataStore +from timesketch.lib.definitions import METRICS_NAMESPACE from timesketch.lib.utils import read_and_validate_csv from timesketch.lib.utils import read_and_validate_jsonl from timesketch.lib.utils import send_email @@ -54,8 +56,6 @@ from timesketch.models.sketch import InvestigativeQuestionConclusion from timesketch.models.user import User -import prometheus_client -from timesketch.lib.definitions import METRICS_NAMESPACE # Metrics definitions METRICS = { From 4c88121c7892e2943989c2fc4ade00842cbe4b73 Mon Sep 17 00:00:00 2001 From: janosch Date: Fri, 17 Jan 2025 14:50:04 +0000 Subject: [PATCH 07/10] rename a metric --- timesketch/lib/tasks.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 68443dd255..fc520554e5 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -94,9 +94,9 @@ ["index_name", "error_type"], namespace=METRICS_NAMESPACE, ), - "worker_total_run_time": prometheus_client.Summary( - "worker_total_run_time", - "Total runtime of the run_csv_jsonl task", + "worker_run_time": prometheus_client.Summary( + "worker_run_time_seconds", + "Runtime of the worker task in seconds", ["index_name", "timeline_id", "source_type"], namespace=METRICS_NAMESPACE, ), @@ -723,6 +723,7 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin Returns: Name (str) of the index. """ + time_start = time.time() if not plaso: raise RuntimeError( ("Plaso isn't installed, " "unable to continue processing plaso files.") @@ -878,6 +879,10 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin # Mark the searchindex and timelines as ready _set_datasource_status(timeline_id, file_path, "ready") + time_took_to_run = time.time() - time_start + METRICS["worker_run_time"].labels( + index_name=index_name, timeline_id=timeline_id, source_type=source_type + ).observe(time_took_to_run) return index_name @@ -1103,7 +1108,7 @@ def run_csv_jsonl( ) time_took_to_run = time.time() - time_start - METRICS["worker_total_run_time"].labels( + METRICS["worker_run_time"].labels( index_name=index_name, timeline_id=timeline_id, source_type=source_type ).observe(time_took_to_run) return index_name From 97a9f6b0cbf66c5a48a167ad8916ac97b803f134 Mon Sep 17 00:00:00 2001 From: janosch Date: Tue, 21 Jan 2025 15:13:49 +0000 Subject: [PATCH 08/10] * Update mapping defaults * Enforce upper limit * Add e2e test --- data/timesketch.conf | 6 ++-- end_to_end_tests/upload_test.py | 50 +++++++++++++++++++++++++++++++++ timesketch/lib/tasks.py | 16 +++++------ 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/data/timesketch.conf b/data/timesketch.conf index d1c7f8d04e..7a03e64093 100644 --- a/data/timesketch.conf +++ b/data/timesketch.conf @@ -36,8 +36,8 @@ OPENSEARCH_TIMEOUT = 10 OPENSEARCH_FLUSH_INTERVAL = 5000 # Be careful when increasing the upper limit since this will impact your # OpenSearch clusters performance and storage requirements! -OPENSEARCH_MAPPING_BUFFER = 0.2 -OPENSEARCH_MAPPING_UPPER_LIMIT = 2000 +OPENSEARCH_MAPPING_BUFFER = 0.1 +OPENSEARCH_MAPPING_UPPER_LIMIT = 1000 # Define what labels should be defined that make it so that a sketch and # timelines will not be deleted. This can be used to add a list of different @@ -374,7 +374,7 @@ LLM_PROVIDER_CONFIGS = { # To use Google's AI Studio simply obtain an API key from https://aistudio.google.com/ # pip install google-generativeai 'aistudio': { - 'api_key': '', + 'api_key': '', 'model': 'gemini-2.0-flash-exp', }, } diff --git a/end_to_end_tests/upload_test.py b/end_to_end_tests/upload_test.py index 8f6a914c29..3e1e6ff973 100755 --- a/end_to_end_tests/upload_test.py +++ b/end_to_end_tests/upload_test.py @@ -14,6 +14,7 @@ """End to end tests of Timesketch upload functionality.""" import os import random +import json from timesketch_api_client import search from . import interface @@ -86,6 +87,55 @@ def test_large_upload_jsonl(self): events = sketch.explore("data_type:foobarjson", as_pandas=True) self.assertions.assertEqual(len(events), 4123) + def test_upload_jsonl_mapping_exceeds_limit(self): + """Test uploading a timeline with a jsonl events that exceeds the + default field mapping limit of 1000. The test will create a temporary + file with events that have many unique keys and then upload the file to + Timesketch. The test will then check that the correct error is triggered. + """ + + # create a new sketch + rand = random.randint(0, 10000) + sketch = self.api.create_sketch( + name=f"test_upload_jsonl_mapping_exceeds_limit {rand}" + ) + self.sketch = sketch + + file_path = "/tmp/mapping_over_1k.jsonl" + num_lines = 100 + num_keys_per_line = 6 + all_keys = set() + + with open(file_path, "w", encoding="utf-8") as file_object: + for i in range(num_lines): + line_data = { + "datetime": "2015-07-24T19:01:01+00:00", + "message": f"Event {i} of {num_lines}", + "timestamp_desc": "test", + "data_type": "test:jsonl", + } + for j in range(num_keys_per_line): + key = f"field_name_{j}_{random.randint(0, 100000)}" + while key in all_keys: # Avoid duplicate keys + key = f"field_name_{random.randint(0, 100000)}" + all_keys.add(key) + line_data[key] = f"value_{j}_{random.randint(0, 10000)}" + + json.dump(line_data, file_object) + file_object.write("\n") + + self.import_timeline(file_path, index_name=rand, sketch=sketch) + os.remove(file_path) + + timeline = sketch.list_timelines()[0] + # check that timeline threw the correct error + self.assertions.assertEqual(timeline.name, file_path) + self.assertions.assertEqual(timeline.index.name, str(rand)) + self.assertions.assertEqual(timeline.index.status, "fail") + self.assertions.assertIn( + "OPENSEARCH_MAPPING_UPPER_LIMIT", timeline.data_sources[0]["error_message"] + ) + def test_very_large_upload_jsonl(self): """Test uploading a timeline with over 50 k events as jsonl. The test will create a temporary file and then diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index fc520554e5..953c315bb5 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -982,10 +982,10 @@ def run_csv_jsonl( error_count = 0 unique_keys = set() limit_buffer_percentage = float( - current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.2) + current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.1) ) upper_mapping_limit = int( - current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 2000) + current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 1000) ) try: @@ -1015,10 +1015,10 @@ def run_csv_jsonl( ): unique_keys.update(event.keys()) # Calculating the new limit. Each unique key is counted twice due to - # the "keyword" type plus a percentage buffer (default 20%). + # the "keyword" type plus a percentage buffer (default 10%). new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage)) # To prevent mapping explosions we still check against an upper - # mapping limit set in timesketch.conf (default: 2000). + # mapping limit set in timesketch.conf (default: 1000). if new_limit > upper_mapping_limit: METRICS["worker_mapping_increase_limit_exceeded"].labels( index_name=index_name @@ -1026,9 +1026,8 @@ def run_csv_jsonl( error_msg = ( f"Error: Indexing timeline [{timeline_name}] into [{index_name}] " f"exceeds the upper field mapping limit of {upper_mapping_limit}. " - f"Currently mapped fields: ~{len(unique_keys)*2} / New " - f"calculated mapping limit: {new_limit}. Review your import " - "data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT." + f"New calculated mapping limit: {new_limit}. Review your " + "import data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT." ) logger.error(error_msg) _set_datasource_status( @@ -1036,7 +1035,8 @@ def run_csv_jsonl( ) return None - if new_limit > current_limit: + if new_limit > current_limit and current_limit < upper_mapping_limit: + new_limit = min(new_limit, upper_mapping_limit) opensearch.client.indices.put_settings( index=index_name, body={"index.mapping.total_fields.limit": new_limit}, From 02461727c87007821017a26f1ca08c46812401f8 Mon Sep 17 00:00:00 2001 From: janosch Date: Tue, 21 Jan 2025 15:24:30 +0000 Subject: [PATCH 09/10] Fix e2e test --- end_to_end_tests/upload_test.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/end_to_end_tests/upload_test.py b/end_to_end_tests/upload_test.py index 3e1e6ff973..91a02e4df1 100755 --- a/end_to_end_tests/upload_test.py +++ b/end_to_end_tests/upload_test.py @@ -69,7 +69,13 @@ def test_large_upload_jsonl(self): string = f'{{"message":"Count {i} {rand}","timestamp":"123456789","datetime":"2015-07-24T19:01:01+00:00","timestamp_desc":"Write time","data_type":"foobarjson"}}\n' # pylint: disable=line-too-long file_object.write(string) - self.import_timeline("/tmp/large.jsonl", index_name=rand, sketch=sketch) + try: + self.import_timeline("/tmp/large.jsonl", index_name=rand, sketch=sketch) + except RuntimeError: + print( + "Timeline import failing is expected. Checking for the correct " + "error message..." + ) os.remove(file_path) timeline = sketch.list_timelines()[0] From 05f743d8ce4bc4825a9e41c630d036b01ffd1f5f Mon Sep 17 00:00:00 2001 From: janosch Date: Tue, 21 Jan 2025 15:33:21 +0000 Subject: [PATCH 10/10] fix correct e2e test --- end_to_end_tests/upload_test.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/end_to_end_tests/upload_test.py b/end_to_end_tests/upload_test.py index 91a02e4df1..6a7b035415 100755 --- a/end_to_end_tests/upload_test.py +++ b/end_to_end_tests/upload_test.py @@ -69,13 +69,7 @@ def test_large_upload_jsonl(self): string = f'{{"message":"Count {i} {rand}","timestamp":"123456789","datetime":"2015-07-24T19:01:01+00:00","timestamp_desc":"Write time","data_type":"foobarjson"}}\n' # pylint: disable=line-too-long file_object.write(string) - try: - self.import_timeline("/tmp/large.jsonl", index_name=rand, sketch=sketch) - except RuntimeError: - print( - "Timeline import failing is expected. Checking for the correct " - "error message..." - ) + self.import_timeline("/tmp/large.jsonl", index_name=rand, sketch=sketch) os.remove(file_path) timeline = sketch.list_timelines()[0] @@ -130,7 +124,13 @@ def test_upload_jsonl_mapping_exceeds_limit(self): json.dump(line_data, file_object) file_object.write("\n") - self.import_timeline(file_path, index_name=rand, sketch=sketch) + try: + self.import_timeline(file_path, index_name=rand, sketch=sketch) + except RuntimeError: + print( + "Timeline import failing is expected. Checking for the correct " + "error message..." + ) os.remove(file_path) timeline = sketch.list_timelines()[0]