Skip to content

Commit

Permalink
Increase OpenSearch mapping limit dynamically during indexing of csv/…
Browse files Browse the repository at this point in the history
…jsonl data (#3257)

* Dynamically increase OpenSearch field mapping limit during indexing of csv/jsonl
* Adding e2e test for gracefully error out if too many unique fields used
* Adding metrics to the run_csv_jsonl function
  • Loading branch information
jkppr authored Jan 22, 2025
1 parent eabeb0a commit bc96728
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 5 deletions.
10 changes: 7 additions & 3 deletions data/timesketch.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ SECRET_KEY = '<KEY_GOES_HERE>'
# production.
SQLALCHEMY_DATABASE_URI = 'postgresql://<USERNAME>:<PASSWORD>@localhost/timesketch'

# Configure where your Elasticsearch server is located.
# Configure where your OpenSearch server is located.
#
# Make sure that the OpenSearch server is properly secured and not accessible
# from the internet. See the following link for more information:
# http://www.elasticsearch.org/blog/scripting-security/
# https://opensearch.org/docs/latest/getting-started/security/
OPENSEARCH_HOST = '127.0.0.1'
OPENSEARCH_PORT = 9200
OPENSEARCH_USER = None
Expand All @@ -34,6 +34,10 @@ OPENSEARCH_SSL = False
OPENSEARCH_VERIFY_CERTS = True
OPENSEARCH_TIMEOUT = 10
OPENSEARCH_FLUSH_INTERVAL = 5000
# Be careful when increasing the upper limit since this will impact your
# OpenSearch clusters performance and storage requirements!
OPENSEARCH_MAPPING_BUFFER = 0.1
OPENSEARCH_MAPPING_UPPER_LIMIT = 1000

# Define what labels should be defined that make it so that a sketch and
# timelines will not be deleted. This can be used to add a list of different
Expand Down Expand Up @@ -370,7 +374,7 @@ LLM_PROVIDER_CONFIGS = {
# To use Google's AI Studio simply obtain an API key from https://aistudio.google.com/
# pip install google-generativeai
'aistudio': {
'api_key': '',
'api_key': '',
'model': 'gemini-2.0-flash-exp',
},
}
Expand Down
56 changes: 56 additions & 0 deletions end_to_end_tests/upload_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""End to end tests of Timesketch upload functionality."""
import os
import random
import json

from timesketch_api_client import search
from . import interface
Expand Down Expand Up @@ -86,6 +87,61 @@ def test_large_upload_jsonl(self):
events = sketch.explore("data_type:foobarjson", as_pandas=True)
self.assertions.assertEqual(len(events), 4123)

def test_upload_jsonl_mapping_exceeds_limit(self):
"""Test uploading a timeline with a jsonl events that exceeds the
default field mapping limit of 1000. The test will create a temporary
file with events that have many unique keys and then upload the file to
Timesketch. The test will then check that the correct error is triggered.
"""

# create a new sketch
rand = random.randint(0, 10000)
sketch = self.api.create_sketch(
name=f"test_upload_jsonl_mapping_exceeds_limit {rand}"
)
self.sketch = sketch

file_path = "/tmp/mapping_over_1k.jsonl"
num_lines = 100
num_keys_per_line = 6
all_keys = set()

with open(file_path, "w", encoding="utf-8") as file_object:
for i in range(num_lines):
line_data = {
"datetime": "2015-07-24T19:01:01+00:00",
"message": f"Event {i} of {num_lines}",
"timestamp_desc": "test",
"data_type": "test:jsonl",
}
for j in range(num_keys_per_line):
key = f"field_name_{j}_{random.randint(0, 100000)}"
while key in all_keys: # Avoid duplicate keys
key = f"field_name_{random.randint(0, 100000)}"
all_keys.add(key)
line_data[key] = f"value_{j}_{random.randint(0, 10000)}"

json.dump(line_data, file_object)
file_object.write("\n")

try:
self.import_timeline(file_path, index_name=rand, sketch=sketch)
except RuntimeError:
print(
"Timeline import failing is expected. Checking for the correct "
"error message..."
)
os.remove(file_path)

timeline = sketch.list_timelines()[0]
# check that timeline threw the correct error
self.assertions.assertEqual(timeline.name, file_path)
self.assertions.assertEqual(timeline.index.name, str(rand))
self.assertions.assertEqual(timeline.index.status, "fail")
self.assertions.assertIn(
"OPENSEARCH_MAPPING_UPPER_LIMIT", timeline.data_sources[0]["error_message"]
)

def test_very_large_upload_jsonl(self):
"""Test uploading a timeline with over 50 k events as jsonl. The test
will create a temporary file and then
Expand Down
141 changes: 139 additions & 2 deletions timesketch/lib/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import logging
import os
import subprocess
import time
import traceback
import six
import yaml
import prometheus_client

from celery import chain
from celery import group
Expand All @@ -40,6 +42,7 @@
from timesketch.lib.analyzers import manager
from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager
from timesketch.lib.datastores.opensearch import OpenSearchDataStore
from timesketch.lib.definitions import METRICS_NAMESPACE
from timesketch.lib.utils import read_and_validate_csv
from timesketch.lib.utils import read_and_validate_jsonl
from timesketch.lib.utils import send_email
Expand All @@ -54,6 +57,51 @@
from timesketch.models.user import User


# Metrics definitions
METRICS = {
"worker_csv_jsonl_runs": prometheus_client.Counter(
"worker_csv_jsonl_runs",
"Number of times the run_csv_jsonl task has been run",
namespace=METRICS_NAMESPACE,
),
"worker_mapping_increase": prometheus_client.Gauge(
"worker_mapping_increase",
"Number of times a mapping increase is requested",
["index_name", "timeline_id"],
namespace=METRICS_NAMESPACE,
),
"worker_mapping_increase_limit_exceeded": prometheus_client.Counter(
"worker_mapping_increase_limit_exceeded",
"Number of times the OpenSearch mapping increase ran into the upper limit",
["index_name"],
namespace=METRICS_NAMESPACE,
),
"worker_files_parsed": prometheus_client.Counter(
"worker_files_parsed",
"Number of files parsed by the worker task",
["source_type"],
namespace=METRICS_NAMESPACE,
),
"worker_events_added": prometheus_client.Gauge(
"worker_events_added",
"Number of events added by the worker parsing task",
["index_name", "timeline_id", "source_type"],
namespace=METRICS_NAMESPACE,
),
"worker_import_errors": prometheus_client.Counter(
"worker_import_errors",
"Number of errors during the import",
["index_name", "error_type"],
namespace=METRICS_NAMESPACE,
),
"worker_run_time": prometheus_client.Summary(
"worker_run_time_seconds",
"Runtime of the worker task in seconds",
["index_name", "timeline_id", "source_type"],
namespace=METRICS_NAMESPACE,
),
}

# To be able to determine plaso's version.
try:
import plaso
Expand Down Expand Up @@ -100,6 +148,10 @@ def get_import_errors(error_container, index_name, total_count):

if error_types:
top_type = error_types.most_common()[0][0]
for error_type in error_types:
METRICS["worker_import_errors"].labels(
index_name=index_name, error_type=error_type
).inc(error_types.get(error_type, 0))
else:
top_type = "Unknown Reasons"

Expand Down Expand Up @@ -671,6 +723,7 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin
Returns:
Name (str) of the index.
"""
time_start = time.time()
if not plaso:
raise RuntimeError(
("Plaso isn't installed, " "unable to continue processing plaso files.")
Expand Down Expand Up @@ -826,6 +879,10 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin

# Mark the searchindex and timelines as ready
_set_datasource_status(timeline_id, file_path, "ready")
time_took_to_run = time.time() - time_start
METRICS["worker_run_time"].labels(
index_name=index_name, timeline_id=timeline_id, source_type=source_type
).observe(time_took_to_run)
return index_name


Expand Down Expand Up @@ -857,11 +914,15 @@ def run_csv_jsonl(
Returns:
Name (str) of the index.
"""
METRICS["worker_csv_jsonl_runs"].inc()
time_start = time.time()

if events:
file_handle = io.StringIO(events)
source_type = "jsonl"
else:
file_handle = codecs.open(file_path, "r", encoding="utf-8", errors="replace")
METRICS["worker_files_parsed"].labels(source_type=source_type).inc()

validators = {
"csv": read_and_validate_csv,
Expand Down Expand Up @@ -919,20 +980,85 @@ def run_csv_jsonl(
final_counter = 0
error_msg = ""
error_count = 0
unique_keys = set()
limit_buffer_percentage = float(
current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.1)
)
upper_mapping_limit = int(
current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 1000)
)

try:
opensearch.create_index(index_name=index_name, mappings=mappings)

current_index_mapping_properties = (
opensearch.client.indices.get_mapping(index=index_name)
.get(index_name, {})
.get("mappings", {})
.get("properties", {})
)
unique_keys = set(current_index_mapping_properties)

try:
current_limit = int(
opensearch.client.indices.get_settings(index=index_name)[index_name][
"settings"
]["index"]["mapping"]["total_fields"]["limit"]
)
except KeyError:
current_limit = 1000

for event in read_and_validate(
file_handle=file_handle,
headers_mapping=headers_mapping,
delimiter=delimiter,
):
unique_keys.update(event.keys())
# Calculating the new limit. Each unique key is counted twice due to
# the "keyword" type plus a percentage buffer (default 10%).
new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage))
# To prevent mapping explosions we still check against an upper
# mapping limit set in timesketch.conf (default: 1000).
if new_limit > upper_mapping_limit:
METRICS["worker_mapping_increase_limit_exceeded"].labels(
index_name=index_name
).inc()
error_msg = (
f"Error: Indexing timeline [{timeline_name}] into [{index_name}] "
f"exceeds the upper field mapping limit of {upper_mapping_limit}. "
f"New calculated mapping limit: {new_limit}. Review your "
"import data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT."
)
logger.error(error_msg)
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
return None

if new_limit > current_limit and current_limit < upper_mapping_limit:
new_limit = min(new_limit, upper_mapping_limit)
opensearch.client.indices.put_settings(
index=index_name,
body={"index.mapping.total_fields.limit": new_limit},
)
METRICS["worker_mapping_increase"].labels(
index_name=index_name, timeline_id=timeline_id
).set(new_limit)
logger.info(
"OpenSearch index [%s] mapping limit increased to: %d",
index_name,
new_limit,
)
current_limit = new_limit

opensearch.import_event(index_name, event, timeline_id=timeline_id)
final_counter += 1

# Import the remaining events
results = opensearch.flush_queued_events()

error_container = results.get("error_container", {})
error_count = len(error_container.get(index_name, {}).get("errors", []))
error_msg = get_import_errors(
error_container=error_container,
index_name=index_name,
Expand All @@ -950,10 +1076,15 @@ def run_csv_jsonl(
except Exception as e: # pylint: disable=broad-except
# Mark the searchindex and timelines as failed and exit the task
error_msg = traceback.format_exc()
_set_datasource_status(timeline_id, file_path, "fail", error_message=error_msg)
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
logger.error("Error: {0!s}\n{1:s}".format(e, error_msg))
return None

METRICS["worker_events_added"].labels(
index_name=index_name, timeline_id=timeline_id, source_type=source_type
).set(final_counter)
if error_count:
logger.info(
"Index timeline: [{0:s}] to index [{1:s}] - {2:d} out of {3:d} "
Expand All @@ -972,8 +1103,14 @@ def run_csv_jsonl(
)

# Set status to ready when done
_set_datasource_status(timeline_id, file_path, "ready", error_message=error_msg)
_set_datasource_status(
timeline_id, file_path, "ready", error_message=str(error_msg)
)

time_took_to_run = time.time() - time_start
METRICS["worker_run_time"].labels(
index_name=index_name, timeline_id=timeline_id, source_type=source_type
).observe(time_took_to_run)
return index_name


Expand Down

0 comments on commit bc96728

Please sign in to comment.