-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(weave): push heavy conditions into WHERE for calls stream query #3501
Changes from 16 commits
9727d0d
9538de0
6564b80
782c420
88d455f
a9e8ced
09b50c5
edfa089
5656808
6f9bbe1
c0cbcd8
953e846
7eaf75e
8eae6ef
bf41916
b8b9dd0
a35e1fa
07a1043
f9a8fbc
e4cee25
4bd68c4
b0f1dd1
6a7ea1c
0b01f62
95cfc5a
3b6b841
51f23e9
2862849
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -73,6 +73,7 @@ def is_heavy(self) -> bool: | |||||
|
||||||
class CallsMergedAggField(CallsMergedField): | ||||||
agg_fn: str | ||||||
use_agg_fn: bool = True | ||||||
|
||||||
def as_sql( | ||||||
self, | ||||||
|
@@ -81,6 +82,8 @@ def as_sql( | |||||
cast: Optional[tsi_query.CastTo] = None, | ||||||
) -> str: | ||||||
inner = super().as_sql(pb, table_alias) | ||||||
if not self.use_agg_fn: | ||||||
return clickhouse_cast(inner) | ||||||
return clickhouse_cast(f"{self.agg_fn}({inner})") | ||||||
|
||||||
|
||||||
|
@@ -258,11 +261,14 @@ class Condition(BaseModel): | |||||
operand: "tsi_query.Operand" | ||||||
_consumed_fields: Optional[list[CallsMergedField]] = None | ||||||
|
||||||
def as_sql(self, pb: ParamBuilder, table_alias: str) -> str: | ||||||
def as_sql( | ||||||
self, pb: ParamBuilder, table_alias: str, ignore_agg_fn: bool = False | ||||||
) -> str: | ||||||
conditions = process_query_to_conditions( | ||||||
tsi_query.Query.model_validate({"$expr": {"$and": [self.operand]}}), | ||||||
pb, | ||||||
table_alias, | ||||||
ignore_agg_fn=ignore_agg_fn, | ||||||
) | ||||||
if self._consumed_fields is None: | ||||||
self._consumed_fields = [] | ||||||
|
@@ -283,6 +289,12 @@ def is_heavy(self) -> bool: | |||||
return True | ||||||
return False | ||||||
|
||||||
def is_feedback(self) -> bool: | ||||||
for field in self._get_consumed_fields(): | ||||||
if isinstance(field, CallsMergedFeedbackPayloadField): | ||||||
return True | ||||||
return False | ||||||
|
||||||
|
||||||
class HardCodedFilter(BaseModel): | ||||||
filter: tsi.CallsFilter | ||||||
|
@@ -448,8 +460,10 @@ def as_sql(self, pb: ParamBuilder, table_alias: str = "calls_merged") -> str: | |||||
) | ||||||
SELECT {SELECT_FIELDS} | ||||||
FROM calls_merged | ||||||
FINAL -- optional, if heavy filter conditions | ||||||
WHERE project_id = {PROJECT_ID} | ||||||
AND id IN filtered_calls | ||||||
AND {HEAVY_FILTER_CONDITIONS} -- optional heavy filter conditions | ||||||
GROUP BY (project_id, id) | ||||||
--- IF ORDER BY CANNOT BE PUSHED DOWN --- | ||||||
HAVING {HEAVY_FILTER_CONDITIONS} -- optional <-- yes, this is inside the conditional | ||||||
|
@@ -586,21 +600,38 @@ def _as_sql_base_format( | |||||
) | ||||||
|
||||||
having_filter_sql = "" | ||||||
having_conditions_sql: list[str] = [] | ||||||
having_light_conditions_sql: list[str] = [] | ||||||
if len(self.query_conditions) > 0: | ||||||
having_conditions_sql.extend( | ||||||
c.as_sql(pb, table_alias) for c in self.query_conditions | ||||||
having_light_conditions_sql.extend( | ||||||
c.as_sql(pb, table_alias) | ||||||
for c in self.query_conditions | ||||||
if not c.is_heavy() or c.is_feedback() | ||||||
) | ||||||
for query_condition in self.query_conditions: | ||||||
for field in query_condition._get_consumed_fields(): | ||||||
if isinstance(field, CallsMergedFeedbackPayloadField): | ||||||
needs_feedback = True | ||||||
if self.hardcoded_filter is not None: | ||||||
having_conditions_sql.append(self.hardcoded_filter.as_sql(pb, table_alias)) | ||||||
having_light_conditions_sql.append( | ||||||
self.hardcoded_filter.as_sql(pb, table_alias) | ||||||
) | ||||||
|
||||||
if len(having_conditions_sql) > 0: | ||||||
if len(having_light_conditions_sql) > 0: | ||||||
having_filter_sql = "HAVING " + combine_conditions( | ||||||
having_conditions_sql, "AND" | ||||||
having_light_conditions_sql, "AND" | ||||||
) | ||||||
|
||||||
heavy_filter_sql = "" | ||||||
simple_like_conditions_sql = "" | ||||||
for condition in self.query_conditions: | ||||||
if not condition.is_heavy() or condition.is_feedback(): | ||||||
continue | ||||||
|
||||||
simple_like_conditions_sql += make_simple_like_condition( | ||||||
pb, condition, table_alias | ||||||
) | ||||||
heavy_filter_sql += " AND " + condition.as_sql( | ||||||
pb, table_alias, ignore_agg_fn=True | ||||||
) | ||||||
|
||||||
order_by_sql = "" | ||||||
|
@@ -654,6 +685,8 @@ def _as_sql_base_format( | |||||
{feedback_where_sql} | ||||||
{id_mask_sql} | ||||||
{id_subquery_sql} | ||||||
{simple_like_conditions_sql} | ||||||
{heavy_filter_sql} | ||||||
GROUP BY (calls_merged.project_id, calls_merged.id) | ||||||
{having_filter_sql} | ||||||
{order_by_sql} | ||||||
|
@@ -664,6 +697,36 @@ def _as_sql_base_format( | |||||
return _safely_format_sql(raw_sql) | ||||||
|
||||||
|
||||||
def make_simple_like_condition( | ||||||
pb: ParamBuilder, condition: Condition, table_alias: str = "calls_merged" | ||||||
) -> str: | ||||||
if isinstance(condition.operand, tsi_query.EqOperation): | ||||||
field_name = get_field_by_name(condition.operand.eq_[0].get_field_).field | ||||||
field_value = condition.operand.eq_[1].literal_ | ||||||
elif isinstance(condition.operand, tsi_query.ContainsOperation): | ||||||
field_name = get_field_by_name( | ||||||
condition.operand.contains_.input.get_field_ | ||||||
).field | ||||||
field_value = condition.operand.contains_.substr.literal_ | ||||||
elif isinstance(condition.operand, tsi_query.InOperation): | ||||||
field_name = get_field_by_name(condition.operand.in_[0].get_field_).field | ||||||
sql = " AND (" | ||||||
for op in condition.operand.in_[1]: | ||||||
op_wildcard_value = f"%{op.literal_}%" | ||||||
op_sql_value = _param_slot(pb.add_param(op_wildcard_value), "String") | ||||||
sql += f"{table_alias}.{field_name} LIKE {op_sql_value} OR " | ||||||
sql = sql[:-4] | ||||||
sql += ")" | ||||||
return sql | ||||||
else: | ||||||
return "" | ||||||
|
||||||
wildcard_value = f"%{field_value}%" | ||||||
sql_value = _param_slot(pb.add_param(wildcard_value), "String") | ||||||
|
||||||
return f"AND + {table_alias}.{field_name} LIKE {sql_value} " | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix SQL syntax error in LIKE condition. There's a syntax error in the return statement where Apply this diff to fix the SQL syntax: - return f"AND + {table_alias}.{field_name} LIKE {sql_value} "
+ return f"AND {table_alias}.{field_name} LIKE {sql_value} " 📝 Committable suggestion
Suggested change
|
||||||
|
||||||
|
||||||
ALLOWED_CALL_FIELDS = { | ||||||
"project_id": CallsMergedField(field="project_id"), | ||||||
"id": CallsMergedField(field="id"), | ||||||
|
@@ -713,6 +776,7 @@ def process_query_to_conditions( | |||||
query: tsi.Query, | ||||||
param_builder: ParamBuilder, | ||||||
table_alias: str, | ||||||
ignore_agg_fn: bool = False, | ||||||
) -> FilterToConditions: | ||||||
"""Converts a Query to a list of conditions for a clickhouse query.""" | ||||||
conditions = [] | ||||||
|
@@ -781,7 +845,12 @@ def process_operand(operand: "tsi_query.Operand") -> str: | |||||
) | ||||||
elif isinstance(operand, tsi_query.GetFieldOperator): | ||||||
structured_field = get_field_by_name(operand.get_field_) | ||||||
field = structured_field.as_sql(param_builder, table_alias) | ||||||
if isinstance(structured_field, CallsMergedAggField) and ignore_agg_fn: | ||||||
structured_field.use_agg_fn = False | ||||||
field = structured_field.as_sql(param_builder, table_alias) | ||||||
structured_field.use_agg_fn = True # reset to default | ||||||
else: | ||||||
field = structured_field.as_sql(param_builder, table_alias) | ||||||
raw_fields_used[structured_field.field] = structured_field | ||||||
return field | ||||||
elif isinstance(operand, tsi_query.ConvertOperation): | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test highlights the error case that the query creates.