Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase OpenSearch mapping limit dynamically during indexing of csv/jsonl data #3257

Merged
merged 13 commits into from
Jan 22, 2025
10 changes: 7 additions & 3 deletions data/timesketch.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ SECRET_KEY = '<KEY_GOES_HERE>'
# production.
SQLALCHEMY_DATABASE_URI = 'postgresql://<USERNAME>:<PASSWORD>@localhost/timesketch'

# Configure where your Elasticsearch server is located.
# Configure where your OpenSearch server is located.
#
# Make sure that the OpenSearch server is properly secured and not accessible
# from the internet. See the following link for more information:
# http://www.elasticsearch.org/blog/scripting-security/
# https://opensearch.org/docs/latest/getting-started/security/
OPENSEARCH_HOST = '127.0.0.1'
OPENSEARCH_PORT = 9200
OPENSEARCH_USER = None
Expand All @@ -34,6 +34,10 @@ OPENSEARCH_SSL = False
OPENSEARCH_VERIFY_CERTS = True
OPENSEARCH_TIMEOUT = 10
OPENSEARCH_FLUSH_INTERVAL = 5000
# Be careful when increasing the upper limit since this will impact your
# OpenSearch clusters performance and storage requirements!
OPENSEARCH_MAPPING_BUFFER = 0.1
OPENSEARCH_MAPPING_UPPER_LIMIT = 1000

# Define what labels should be defined that make it so that a sketch and
# timelines will not be deleted. This can be used to add a list of different
Expand Down Expand Up @@ -370,7 +374,7 @@ LLM_PROVIDER_CONFIGS = {
# To use Google's AI Studio simply obtain an API key from https://aistudio.google.com/
# pip install google-generativeai
'aistudio': {
'api_key': '',
'api_key': '',
'model': 'gemini-2.0-flash-exp',
},
}
Expand Down
56 changes: 56 additions & 0 deletions end_to_end_tests/upload_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""End to end tests of Timesketch upload functionality."""
import os
import random
import json

from timesketch_api_client import search
from . import interface
Expand Down Expand Up @@ -86,6 +87,61 @@ def test_large_upload_jsonl(self):
events = sketch.explore("data_type:foobarjson", as_pandas=True)
self.assertions.assertEqual(len(events), 4123)

def test_upload_jsonl_mapping_exceeds_limit(self):
"""Test uploading a timeline with a jsonl events that exceeds the
default field mapping limit of 1000. The test will create a temporary
file with events that have many unique keys and then upload the file to
Timesketch. The test will then check that the correct error is triggered.
"""

# create a new sketch
rand = random.randint(0, 10000)
sketch = self.api.create_sketch(
name=f"test_upload_jsonl_mapping_exceeds_limit {rand}"
)
self.sketch = sketch

file_path = "/tmp/mapping_over_1k.jsonl"
num_lines = 100
num_keys_per_line = 6
all_keys = set()

with open(file_path, "w", encoding="utf-8") as file_object:
for i in range(num_lines):
line_data = {
"datetime": "2015-07-24T19:01:01+00:00",
"message": f"Event {i} of {num_lines}",
"timestamp_desc": "test",
"data_type": "test:jsonl",
}
for j in range(num_keys_per_line):
key = f"field_name_{j}_{random.randint(0, 100000)}"
while key in all_keys: # Avoid duplicate keys
key = f"field_name_{random.randint(0, 100000)}"
all_keys.add(key)
line_data[key] = f"value_{j}_{random.randint(0, 10000)}"

json.dump(line_data, file_object)
file_object.write("\n")

try:
self.import_timeline(file_path, index_name=rand, sketch=sketch)
except RuntimeError:
print(
"Timeline import failing is expected. Checking for the correct "
"error message..."
)
os.remove(file_path)

timeline = sketch.list_timelines()[0]
# check that timeline threw the correct error
self.assertions.assertEqual(timeline.name, file_path)
self.assertions.assertEqual(timeline.index.name, str(rand))
self.assertions.assertEqual(timeline.index.status, "fail")
self.assertions.assertIn(
"OPENSEARCH_MAPPING_UPPER_LIMIT", timeline.data_sources[0]["error_message"]
)

def test_very_large_upload_jsonl(self):
"""Test uploading a timeline with over 50 k events as jsonl. The test
will create a temporary file and then
Expand Down
141 changes: 139 additions & 2 deletions timesketch/lib/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import logging
import os
import subprocess
import time
import traceback
import six
import yaml
import prometheus_client

from celery import chain
from celery import group
Expand All @@ -40,6 +42,7 @@
from timesketch.lib.analyzers import manager
from timesketch.lib.analyzers.dfiq_plugins.manager import DFIQAnalyzerManager
from timesketch.lib.datastores.opensearch import OpenSearchDataStore
from timesketch.lib.definitions import METRICS_NAMESPACE
from timesketch.lib.utils import read_and_validate_csv
from timesketch.lib.utils import read_and_validate_jsonl
from timesketch.lib.utils import send_email
Expand All @@ -54,6 +57,51 @@
from timesketch.models.user import User


# Metrics definitions
METRICS = {
"worker_csv_jsonl_runs": prometheus_client.Counter(
"worker_csv_jsonl_runs",
"Number of times the run_csv_jsonl task has been run",
namespace=METRICS_NAMESPACE,
),
"worker_mapping_increase": prometheus_client.Gauge(
"worker_mapping_increase",
"Number of times a mapping increase is requested",
["index_name", "timeline_id"],
namespace=METRICS_NAMESPACE,
),
"worker_mapping_increase_limit_exceeded": prometheus_client.Counter(
"worker_mapping_increase_limit_exceeded",
"Number of times the OpenSearch mapping increase ran into the upper limit",
["index_name"],
namespace=METRICS_NAMESPACE,
),
"worker_files_parsed": prometheus_client.Counter(
"worker_files_parsed",
"Number of files parsed by the worker task",
["source_type"],
namespace=METRICS_NAMESPACE,
),
"worker_events_added": prometheus_client.Gauge(
"worker_events_added",
"Number of events added by the worker parsing task",
["index_name", "timeline_id", "source_type"],
namespace=METRICS_NAMESPACE,
),
"worker_import_errors": prometheus_client.Counter(
"worker_import_errors",
"Number of errors during the import",
["index_name", "error_type"],
namespace=METRICS_NAMESPACE,
),
"worker_run_time": prometheus_client.Summary(
"worker_run_time_seconds",
"Runtime of the worker task in seconds",
["index_name", "timeline_id", "source_type"],
namespace=METRICS_NAMESPACE,
),
}

# To be able to determine plaso's version.
try:
import plaso
Expand Down Expand Up @@ -100,6 +148,10 @@ def get_import_errors(error_container, index_name, total_count):

if error_types:
top_type = error_types.most_common()[0][0]
for error_type in error_types:
METRICS["worker_import_errors"].labels(
index_name=index_name, error_type=error_type
).inc(error_types.get(error_type, 0))
else:
top_type = "Unknown Reasons"

Expand Down Expand Up @@ -671,6 +723,7 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin
Returns:
Name (str) of the index.
"""
time_start = time.time()
if not plaso:
raise RuntimeError(
("Plaso isn't installed, " "unable to continue processing plaso files.")
Expand Down Expand Up @@ -826,6 +879,10 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin

# Mark the searchindex and timelines as ready
_set_datasource_status(timeline_id, file_path, "ready")
time_took_to_run = time.time() - time_start
METRICS["worker_run_time"].labels(
index_name=index_name, timeline_id=timeline_id, source_type=source_type
).observe(time_took_to_run)
return index_name


Expand Down Expand Up @@ -857,11 +914,15 @@ def run_csv_jsonl(
Returns:
Name (str) of the index.
"""
METRICS["worker_csv_jsonl_runs"].inc()
time_start = time.time()

if events:
file_handle = io.StringIO(events)
source_type = "jsonl"
else:
file_handle = codecs.open(file_path, "r", encoding="utf-8", errors="replace")
METRICS["worker_files_parsed"].labels(source_type=source_type).inc()

validators = {
"csv": read_and_validate_csv,
Expand Down Expand Up @@ -919,20 +980,85 @@ def run_csv_jsonl(
final_counter = 0
error_msg = ""
error_count = 0
unique_keys = set()
limit_buffer_percentage = float(
current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.1)
)
upper_mapping_limit = int(
current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 1000)
)

try:
opensearch.create_index(index_name=index_name, mappings=mappings)

current_index_mapping_properties = (
opensearch.client.indices.get_mapping(index=index_name)
.get(index_name, {})
.get("mappings", {})
.get("properties", {})
)
unique_keys = set(current_index_mapping_properties)

try:
current_limit = int(
opensearch.client.indices.get_settings(index=index_name)[index_name][
"settings"
]["index"]["mapping"]["total_fields"]["limit"]
)
except KeyError:
current_limit = 1000

for event in read_and_validate(
file_handle=file_handle,
headers_mapping=headers_mapping,
delimiter=delimiter,
):
unique_keys.update(event.keys())
# Calculating the new limit. Each unique key is counted twice due to
# the "keyword" type plus a percentage buffer (default 10%).
new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage))
# To prevent mapping explosions we still check against an upper
# mapping limit set in timesketch.conf (default: 1000).
if new_limit > upper_mapping_limit:
METRICS["worker_mapping_increase_limit_exceeded"].labels(
index_name=index_name
).inc()
error_msg = (
f"Error: Indexing timeline [{timeline_name}] into [{index_name}] "
f"exceeds the upper field mapping limit of {upper_mapping_limit}. "
f"New calculated mapping limit: {new_limit}. Review your "
"import data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT."
)
logger.error(error_msg)
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
return None

if new_limit > current_limit and current_limit < upper_mapping_limit:
new_limit = min(new_limit, upper_mapping_limit)
opensearch.client.indices.put_settings(
index=index_name,
body={"index.mapping.total_fields.limit": new_limit},
)
METRICS["worker_mapping_increase"].labels(
index_name=index_name, timeline_id=timeline_id
).set(new_limit)
logger.info(
"OpenSearch index [%s] mapping limit increased to: %d",
index_name,
new_limit,
)
current_limit = new_limit

opensearch.import_event(index_name, event, timeline_id=timeline_id)
final_counter += 1

# Import the remaining events
results = opensearch.flush_queued_events()

error_container = results.get("error_container", {})
error_count = len(error_container.get(index_name, {}).get("errors", []))
error_msg = get_import_errors(
error_container=error_container,
index_name=index_name,
Expand All @@ -950,10 +1076,15 @@ def run_csv_jsonl(
except Exception as e: # pylint: disable=broad-except
# Mark the searchindex and timelines as failed and exit the task
error_msg = traceback.format_exc()
_set_datasource_status(timeline_id, file_path, "fail", error_message=error_msg)
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
logger.error("Error: {0!s}\n{1:s}".format(e, error_msg))
return None

METRICS["worker_events_added"].labels(
index_name=index_name, timeline_id=timeline_id, source_type=source_type
).set(final_counter)
if error_count:
logger.info(
"Index timeline: [{0:s}] to index [{1:s}] - {2:d} out of {3:d} "
Expand All @@ -972,8 +1103,14 @@ def run_csv_jsonl(
)

# Set status to ready when done
_set_datasource_status(timeline_id, file_path, "ready", error_message=error_msg)
_set_datasource_status(
timeline_id, file_path, "ready", error_message=str(error_msg)
)

time_took_to_run = time.time() - time_start
METRICS["worker_run_time"].labels(
index_name=index_name, timeline_id=timeline_id, source_type=source_type
).observe(time_took_to_run)
return index_name


Expand Down
Loading