From ca07fc65494dd7b2d69435abbbc1be6d365eedf8 Mon Sep 17 00:00:00 2001 From: Griffin Tarpenning Date: Wed, 26 Feb 2025 18:45:27 -0800 Subject: [PATCH] perf(weave): use LIKE string compare in pre-group by conditioning (#3791) --- tests/trace/test_client_trace.py | 14 +- .../trace_server/test_calls_query_builder.py | 298 +++++++++++++++++- weave/trace_server/calls_query_builder.py | 172 +++++++++- 3 files changed, 463 insertions(+), 21 deletions(-) diff --git a/tests/trace/test_client_trace.py b/tests/trace/test_client_trace.py index 7968dabdcfec..ecb34ace7e2f 100644 --- a/tests/trace/test_client_trace.py +++ b/tests/trace/test_client_trace.py @@ -3373,7 +3373,7 @@ def test_call_stream_query_heavy_query_batch(client): parent_id=parent_id, started_at=datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(seconds=1), - attributes={"a": 5}, + attributes={"a": 5, "empty": "", "null": None}, inputs={"param": {"value1": "hello"}}, ) client.server.call_start(tsi.CallStartReq(start=start)) @@ -3454,3 +3454,15 @@ def test_call_stream_query_heavy_query_batch(client): for call in res: assert call.inputs["param"]["value1"] == "helslo" assert call.output["d"] == 5 + + # now try to filter by the empty attribute string + query = { + "project_id": project_id, + "query": { + "$expr": {"$eq": [{"$getField": "attributes.empty"}, {"$literal": ""}]} + }, + } + res = client.server.calls_query_stream(tsi.CallsQueryReq.model_validate(query)) + assert len(list(res)) == 10 + for call in res: + assert call.attributes["empty"] == "" diff --git a/tests/trace_server/test_calls_query_builder.py b/tests/trace_server/test_calls_query_builder.py index 9babc57aa08a..a52a79cbd10c 100644 --- a/tests/trace_server/test_calls_query_builder.py +++ b/tests/trace_server/test_calls_query_builder.py @@ -279,7 +279,7 @@ def test_query_heavy_column_simple_filter_with_order_and_limit_and_mixed_query_c AND (calls_merged.id IN filtered_calls) AND - ((JSON_VALUE(calls_merged.inputs_dump, {pb_3:String}) = {pb_4:String}) + (calls_merged.inputs_dump LIKE {pb_5:String} OR calls_merged.inputs_dump IS NULL) GROUP BY (calls_merged.project_id, calls_merged.id) HAVING ( @@ -294,6 +294,7 @@ def test_query_heavy_column_simple_filter_with_order_and_limit_and_mixed_query_c "pb_2": "project", "pb_3": '$."param"."val"', "pb_4": "hello", + "pb_5": '%"hello"%', }, ) @@ -303,12 +304,11 @@ def assert_sql(cq: CallsQuery, exp_query, exp_params): query = cq.as_sql(pb) params = pb.get_params() - assert exp_params == params - exp_formatted = sqlparse.format(exp_query, reindent=True) found_formatted = sqlparse.format(query, reindent=True) assert exp_formatted == found_formatted + assert exp_params == params def test_query_light_column_with_costs() -> None: @@ -779,7 +779,7 @@ def test_calls_query_with_predicate_filters() -> None: AND (calls_merged.id IN filtered_calls) AND - ((JSON_VALUE(calls_merged.inputs_dump, {pb_2:String}) = {pb_3:String}) + (calls_merged.inputs_dump LIKE {pb_4:String} OR calls_merged.inputs_dump IS NULL) GROUP BY (calls_merged.project_id, calls_merged.id) HAVING ( @@ -791,6 +791,7 @@ def test_calls_query_with_predicate_filters() -> None: "pb_1": "project", "pb_2": '$."param"."val"', "pb_3": "hello", + "pb_4": '%"hello"%', }, ) @@ -847,8 +848,8 @@ def test_calls_query_with_predicate_filters_multiple_heavy_conditions() -> None: calls_merged.project_id = {pb_1:String} AND (calls_merged.id IN filtered_calls) - AND ((((JSON_VALUE(calls_merged.inputs_dump, {pb_2:String}) = {pb_3:String}) OR calls_merged.inputs_dump IS NULL)) - AND (((JSON_VALUE(calls_merged.output_dump, {pb_4:String}) = {pb_5:String}) OR calls_merged.output_dump IS NULL))) + AND (((calls_merged.inputs_dump LIKE {pb_6:String} OR calls_merged.inputs_dump IS NULL)) + AND ((calls_merged.output_dump LIKE {pb_7:String} OR calls_merged.output_dump IS NULL))) GROUP BY (calls_merged.project_id, calls_merged.id) HAVING ( ((JSON_VALUE(any(calls_merged.inputs_dump), {pb_2:String}) = {pb_3:String})) @@ -863,6 +864,8 @@ def test_calls_query_with_predicate_filters_multiple_heavy_conditions() -> None: "pb_3": "hello", "pb_4": '$."result"', "pb_5": "success", + "pb_6": '%"hello"%', + "pb_7": '%"success"%', }, ) @@ -912,11 +915,11 @@ def test_calls_query_with_or_between_start_and_end_fields() -> None: AND ((NOT ((any(calls_merged.started_at) IS NULL))))) """, { - "pb_4": "project", "pb_0": '$."param"."val"', "pb_1": "hello", "pb_2": '$."result"', "pb_3": "success", + "pb_4": "project", }, ) @@ -991,10 +994,10 @@ def test_calls_query_with_complex_heavy_filters() -> None: calls_merged.project_id = {pb_1:String} AND (calls_merged.id IN filtered_calls) - AND ((((JSON_VALUE(calls_merged.inputs_dump, {pb_2:String}) = {pb_3:String}) OR calls_merged.inputs_dump IS NULL)) + AND (((calls_merged.inputs_dump LIKE {pb_10:String} OR calls_merged.inputs_dump IS NULL)) AND (((JSON_VALUE(calls_merged.inputs_dump, {pb_4:String}) > {pb_5:UInt64}) OR calls_merged.inputs_dump IS NULL)) - AND (((JSON_VALUE(calls_merged.output_dump, {pb_6:String}) = {pb_7:String}) OR calls_merged.output_dump IS NULL)) - AND ((positionCaseInsensitive(JSON_VALUE(calls_merged.output_dump, {pb_8:String}), {pb_9:String}) > 0 OR calls_merged.output_dump IS NULL))) + AND ((calls_merged.output_dump LIKE {pb_11:String} OR calls_merged.output_dump IS NULL)) + AND ((lower(calls_merged.output_dump) LIKE {pb_12:String} OR calls_merged.output_dump IS NULL))) GROUP BY (calls_merged.project_id, calls_merged.id) HAVING ( ((JSON_VALUE(any(calls_merged.inputs_dump), {pb_2:String}) = {pb_3:String})) @@ -1017,5 +1020,280 @@ def test_calls_query_with_complex_heavy_filters() -> None: "pb_7": "success", "pb_8": '$."result"."message"', "pb_9": "completed", + "pb_10": '%"hello"%', + "pb_11": '%"success"%', + "pb_12": '%"%completed%"%', + }, + ) + + +def test_calls_query_with_like_optimization() -> None: + """Test that simple JSON field equality checks use LIKE optimization.""" + cq = CallsQuery(project_id="project") + cq.add_field("id") + cq.add_condition( + tsi_query.EqOperation.model_validate( + { + "$eq": [ + {"$getField": "inputs.param"}, + {"$literal": "hello"}, + ] + } + ) + ) + + assert_sql( + cq, + """ + SELECT + calls_merged.id AS id + FROM calls_merged + WHERE + calls_merged.project_id = {pb_3:String} + AND + (calls_merged.inputs_dump LIKE {pb_2:String} OR calls_merged.inputs_dump IS NULL) + GROUP BY (calls_merged.project_id, calls_merged.id) + HAVING ( + ((JSON_VALUE(any(calls_merged.inputs_dump), {pb_0:String}) = {pb_1:String})) + AND ((any(calls_merged.deleted_at) IS NULL)) + AND ((NOT ((any(calls_merged.started_at) IS NULL)))) + ) + """, + { + "pb_3": "project", + "pb_2": '%"hello"%', + "pb_1": "hello", + "pb_0": '$."param"', + }, + ) + + +def test_calls_query_with_like_optimization_contains() -> None: + """Test that contains operations on JSON fields use LIKE optimization.""" + cq = CallsQuery(project_id="project") + cq.add_field("id") + cq.add_condition( + tsi_query.ContainsOperation.model_validate( + { + "$contains": { + "input": {"$getField": "inputs.param"}, + "substr": {"$literal": "hello"}, + "case_insensitive": True, + } + } + ) + ) + + assert_sql( + cq, + """ + SELECT + calls_merged.id AS id + FROM calls_merged + WHERE + calls_merged.project_id = {pb_3:String} + AND + (lower(calls_merged.inputs_dump) LIKE {pb_2:String} OR calls_merged.inputs_dump IS NULL) + GROUP BY (calls_merged.project_id, calls_merged.id) + HAVING ( + (positionCaseInsensitive(JSON_VALUE(any(calls_merged.inputs_dump), {pb_0:String}), {pb_1:String}) > 0) + AND ((any(calls_merged.deleted_at) IS NULL)) + AND ((NOT ((any(calls_merged.started_at) IS NULL)))) + ) + """, + { + "pb_0": '$."param"', + "pb_3": "project", + "pb_2": '%"%hello%"%', + "pb_1": "hello", + }, + ) + + +def test_query_with_json_value_in_condition() -> None: + """Test that in operations on JSON fields use JSON_VALUE with IN.""" + cq = CallsQuery(project_id="project") + cq.add_field("id") + cq.add_condition( + tsi_query.InOperation.model_validate( + { + "$in": [ + {"$getField": "inputs.param"}, + [{"$literal": "hello"}, {"$literal": "world"}], + ] + } + ) + ) + + assert_sql( + cq, + """ + SELECT + calls_merged.id AS id + FROM calls_merged + WHERE + calls_merged.project_id = {pb_5:String} + AND + ((calls_merged.inputs_dump LIKE {pb_3:String} + OR calls_merged.inputs_dump LIKE {pb_4:String}) + OR calls_merged.inputs_dump IS NULL) + GROUP BY (calls_merged.project_id, calls_merged.id) + HAVING ( + ((JSON_VALUE(any(calls_merged.inputs_dump), {pb_0:String}) IN ({pb_1:String},{pb_2:String}))) + AND ((any(calls_merged.deleted_at) IS NULL)) + AND ((NOT ((any(calls_merged.started_at) IS NULL)))) + ) + """, + { + "pb_0": '$."param"', + "pb_1": "hello", + "pb_2": "world", + "pb_5": "project", + "pb_3": '%"hello"%', + "pb_4": '%"world"%', + }, + ) + + +def test_calls_query_with_combined_like_optimizations_and_op_filter() -> None: + """Test combining multiple LIKE optimizations with different operators and fields.""" + cq = CallsQuery(project_id="project") + cq.add_field("id") + cq.add_field("attributes") + cq.add_field("inputs") + + # Add a hardcoded filter for op_names + cq.set_hardcoded_filter( + HardCodedFilter( + filter=tsi.CallsFilter( + op_names=["llm/openai", "llm/anthropic"], + ) + ) + ) + + # Add a complex condition with multiple operators on different fields + cq.add_condition( + tsi_query.AndOperation.model_validate( + { + "$and": [ + # Equality on attributes + { + "$eq": [ + {"$getField": "attributes.model"}, + {"$literal": "gpt-4"}, + ] + }, + # Contains on inputs + { + "$contains": { + "input": {"$getField": "inputs.prompt"}, + "substr": {"$literal": "weather"}, + "case_insensitive": True, + } + }, + # In operation on attributes + { + "$in": [ + {"$getField": "attributes.temperature"}, + [{"$literal": "0.7"}, {"$literal": "0.8"}], + ] + }, + ] + } + ) + ) + + assert_sql( + cq, + """ + WITH filtered_calls AS ( + SELECT + calls_merged.id AS id + FROM calls_merged + WHERE calls_merged.project_id = {pb_1:String} + GROUP BY (calls_merged.project_id, calls_merged.id) + HAVING ( + ((any(calls_merged.deleted_at) IS NULL)) + AND ((NOT ((any(calls_merged.started_at) IS NULL)))) + AND (any(calls_merged.op_name) IN {pb_0:Array(String)}) + ) + ) + SELECT + calls_merged.id AS id, + any(calls_merged.attributes_dump) AS attributes_dump, + any(calls_merged.inputs_dump) AS inputs_dump + FROM calls_merged + WHERE + calls_merged.project_id = {pb_1:String} + AND + (calls_merged.id IN filtered_calls) + AND (((calls_merged.attributes_dump LIKE {pb_9:String} + OR calls_merged.attributes_dump IS NULL)) + AND ((lower(calls_merged.inputs_dump) LIKE {pb_10:String} + OR calls_merged.inputs_dump IS NULL)) + AND (((calls_merged.attributes_dump LIKE {pb_11:String} + OR calls_merged.attributes_dump LIKE {pb_12:String}) + OR calls_merged.attributes_dump IS NULL))) + GROUP BY (calls_merged.project_id, calls_merged.id) + HAVING ( + ((JSON_VALUE(any(calls_merged.attributes_dump), {pb_2:String}) = {pb_3:String})) + AND + (positionCaseInsensitive(JSON_VALUE(any(calls_merged.inputs_dump), {pb_4:String}), {pb_5:String}) > 0) + AND + ((JSON_VALUE(any(calls_merged.attributes_dump), {pb_6:String}) IN ({pb_7:String},{pb_8:String}))) + ) + """, + { + "pb_0": ["llm/openai", "llm/anthropic"], + "pb_1": "project", + "pb_2": '$."model"', + "pb_3": "gpt-4", + "pb_4": '$."prompt"', + "pb_5": "weather", + "pb_6": '$."temperature"', + "pb_7": "0.7", + "pb_8": "0.8", + "pb_9": '%"gpt-4"%', + "pb_10": '%"%weather%"%', + "pb_11": '%"0.7"%', + "pb_12": '%"0.8"%', + }, + ) + + +def test_calls_query_filter_by_empty_str() -> None: + cq = CallsQuery(project_id="project") + cq.add_field("id") + cq.add_field("inputs") + cq.add_condition( + tsi_query.EqOperation.model_validate( + {"$eq": [{"$getField": "inputs.param.val"}, {"$literal": ""}]} + ) + ) + # Empty string is not a valid value for LIKE optimization, this test is to ensure that + # the query builder uses the JSON_VALUE function in the WHERE. + assert_sql( + cq, + """ + SELECT + calls_merged.id AS id, + any(calls_merged.inputs_dump) AS inputs_dump + FROM calls_merged + WHERE + calls_merged.project_id = {pb_2:String} + AND + ((JSON_VALUE(calls_merged.inputs_dump, {pb_0:String}) = {pb_1:String}) + OR calls_merged.inputs_dump IS NULL) + GROUP BY (calls_merged.project_id, calls_merged.id) + HAVING ( + ((JSON_VALUE(any(calls_merged.inputs_dump), {pb_0:String}) = {pb_1:String})) + AND ((any(calls_merged.deleted_at) IS NULL)) + AND ((NOT ((any(calls_merged.started_at) IS NULL)))) + ) + """, + { + "pb_0": '$."param"."val"', + "pb_1": "", + "pb_2": "project", }, ) diff --git a/weave/trace_server/calls_query_builder.py b/weave/trace_server/calls_query_builder.py index a08a52ae5639..727c1f22f71a 100644 --- a/weave/trace_server/calls_query_builder.py +++ b/weave/trace_server/calls_query_builder.py @@ -209,8 +209,12 @@ def json_dump_field_as_sql( root_field_sanitized: str, extra_path: Optional[list[str]] = None, cast: Optional[tsi_query.CastTo] = None, + use_agg_fn: bool = True, ) -> str: if cast != "exists": + if not use_agg_fn: + return f"{root_field_sanitized}" + path_str = "'$'" if extra_path: param_name = pb.add_param(quote_json_path_parts(extra_path)) @@ -702,7 +706,7 @@ def _as_sql_base_format( } START_ONLY_CALL_FIELDS = {"started_at", "inputs_dump", "attributes_dump"} -END_ONLY_CALL_FIELDS = {"ended_at", "output_dump", "attributes_dump"} +END_ONLY_CALL_FIELDS = {"ended_at", "output_dump", "summary_dump"} def get_field_by_name(name: str) -> CallsMergedField: @@ -966,18 +970,28 @@ def make_call_parts_predicate_filters_sql( # TODO: support OR conditions between start and end fields return "" - # Process the condition to create a predicate filter, using non-aggregate fields - condition_sql = process_query_to_conditions( - tsi_query.Query.model_validate({"$expr": {"$and": [condition.operand]}}), - pb, - table_alias, - use_agg_fn=False, - ).conditions[0] + # Check if this condition can use the LIKE optimization, only applies to string fields + # on dynamic json columns + like_optimized_condition = try_create_like_optimized_condition( + condition, pb, table_alias + ) + if like_optimized_condition: + condition_sql = like_optimized_condition + else: + # Process the condition to create a predicate filter, using non-aggregate fields + condition_sql = process_query_to_conditions( + tsi_query.Query.model_validate( + {"$expr": {"$and": [condition.operand]}} + ), + pb, + table_alias, + use_agg_fn=False, + ).conditions[0] # Build NULL allowances for start-only fields if start_only_fields: null_conditions = [ - f"{table_alias}.{field} IS NULL" for field in start_only_fields + f"{table_alias}.{field} IS NULL" for field in set(start_only_fields) ] if null_conditions: condition_sql = f"({condition_sql} OR {' OR '.join(null_conditions)})" @@ -985,7 +999,7 @@ def make_call_parts_predicate_filters_sql( # Build NULL allowances for end-only fields if end_only_fields: null_conditions = [ - f"{table_alias}.{field} IS NULL" for field in end_only_fields + f"{table_alias}.{field} IS NULL" for field in set(end_only_fields) ] if null_conditions: condition_sql = f"({condition_sql} OR {' OR '.join(null_conditions)})" @@ -998,6 +1012,144 @@ def make_call_parts_predicate_filters_sql( return "" +def try_create_like_optimized_condition( + condition: Condition, pb: ParamBuilder, table_alias: str +) -> Optional[str]: + """ + Attempts to create a LIKE-optimized condition for simple string operations on JSON fields. + Supports equality checks, contains operations, and in operations. + Returns None if the optimization cannot be applied. + """ + # Handle different operation types + if isinstance(condition.operand, tsi_query.EqOperation): + return _create_like_optimized_eq_condition( + condition.operand, condition, pb, table_alias + ) + elif isinstance(condition.operand, tsi_query.ContainsOperation): + return _create_like_optimized_contains_condition( + condition.operand, condition, pb, table_alias + ) + elif isinstance(condition.operand, tsi_query.InOperation): + return _create_like_optimized_in_condition( + condition.operand, condition, pb, table_alias + ) + + return None + + +def _create_like_condition( + field: str, + like_pattern: str, + pb: ParamBuilder, + table_alias: str, + case_insensitive: bool = False, +) -> str: + """Creates a LIKE condition for a JSON field with NULL handling for start/end-only fields.""" + field_name = f"{table_alias}.{field}" + + if case_insensitive: + param_name = pb.add_param(like_pattern.lower()) + like_condition = f"lower({field_name}) LIKE {_param_slot(param_name, 'String')}" + else: + param_name = pb.add_param(like_pattern) + like_condition = f"{field_name} LIKE {_param_slot(param_name, 'String')}" + + return like_condition + + +def _create_like_optimized_eq_condition( + operation: tsi_query.EqOperation, + condition: Condition, + pb: ParamBuilder, + table_alias: str, +) -> Optional[str]: + """Creates a LIKE-optimized condition for equality operations.""" + # Check if the left side is a GetField operation on a JSON field + if not isinstance(operation.eq_[0], tsi_query.GetFieldOperator): + return None + # Return if right-side isn't a string literal + if not isinstance(operation.eq_[1], tsi_query.LiteralOperation) or not isinstance( + operation.eq_[1].literal_, str + ): + return None + + field = get_field_by_name(operation.eq_[0].get_field_).field + literal_value = operation.eq_[1].literal_ + if not literal_value: + # Empty string is not a valid value for LIKE optimization + return None + like_pattern = f'%"{literal_value}"%' + + return _create_like_condition(field, like_pattern, pb, table_alias) + + +def _create_like_optimized_contains_condition( + operation: tsi_query.ContainsOperation, + condition: Condition, + pb: ParamBuilder, + table_alias: str, +) -> Optional[str]: + """Creates a LIKE-optimized condition for contains operations.""" + # Check if the input is a GetField operation on a JSON field + if not isinstance(operation.contains_.input, tsi_query.GetFieldOperator): + return None + # Return if substr isn't a string literal + if not isinstance( + operation.contains_.substr, tsi_query.LiteralOperation + ) or not isinstance(operation.contains_.substr.literal_, str): + return None + + field = get_field_by_name(operation.contains_.input.get_field_).field + substr_value = operation.contains_.substr.literal_ + if not substr_value: + # Empty string is not a valid value for LIKE optimization + return None + case_insensitive = operation.contains_.case_insensitive or False + like_pattern = f'%"%{substr_value}%"%' + + return _create_like_condition( + field, like_pattern, pb, table_alias, case_insensitive + ) + + +def _create_like_optimized_in_condition( + operation: tsi_query.InOperation, + condition: Condition, + pb: ParamBuilder, + table_alias: str, +) -> Optional[str]: + """Creates a LIKE-optimized condition for in operations.""" + # Check if the left side is a GetField operation on a JSON field + if not isinstance(operation.in_[0], tsi_query.GetFieldOperator): + return None + # Return if right-side isn't non-empty list + if ( + len(operation.in_) != 2 + or not isinstance(operation.in_[1], list) + or len(operation.in_[1]) == 0 + ): + return None + + field = get_field_by_name(operation.in_[0].get_field_).field + + # Create OR conditions for each value + like_conditions: list[str] = [] + + for value_operand in operation.in_[1]: + if ( + not isinstance(value_operand, tsi_query.LiteralOperation) + or not isinstance(value_operand.literal_, str) + or not value_operand.literal_ + ): + return None + + like_pattern = f'%"{value_operand.literal_}"%' + like_condition = _create_like_condition(field, like_pattern, pb, table_alias) + like_conditions.append(like_condition) + + return "(" + " OR ".join(like_conditions) + ")" + + def _param_slot(param_name: str, param_type: str) -> str: """Helper function to create a parameter slot for a clickhouse query.""" return f"{{{param_name}:{param_type}}}"