Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add group by trimming to MSQE/V2 query engine #14727

Merged
merged 10 commits into from
Jan 14, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ public BrokerReduceService(PinotConfiguration config) {
super(config);
}

public BrokerResponseNative reduceOnDataTable(
BrokerRequest brokerRequest,
BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap,
long reduceTimeOutMs,
BrokerMetrics brokerMetrics) {
public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap,
long reduceTimeOutMs, BrokerMetrics brokerMetrics) {
if (dataTableMap.isEmpty()) {
// Empty response.
return BrokerResponseNative.empty();
Expand All @@ -74,12 +71,12 @@ public BrokerResponseNative reduceOnDataTable(
queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));

ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
BrokerResponseNative response = new BrokerResponseNative();
BrokerResponseNative brokerResponseNative = new BrokerResponseNative();

// Cache a data schema from data tables (try to cache one with data rows associated with it).
DataSchema schemaOfEmptyTable = null;
DataSchema schemaOfNonEmptyTable = null;
List<ServerRoutingInstance> serversWithConflictingSchema = new ArrayList<>();
DataSchema dataSchemaFromEmptyDataTable = null;
DataSchema dataSchemaFromNonEmptyDataTable = null;
List<ServerRoutingInstance> serversWithConflictingDataSchema = new ArrayList<>();

// Process server response metadata.
Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
Expand All @@ -97,19 +94,19 @@ public BrokerResponseNative reduceOnDataTable(
} else {
// Try to cache a data table with data rows inside, or cache one with data schema inside.
if (dataTable.getNumberOfRows() == 0) {
if (schemaOfEmptyTable == null) {
schemaOfEmptyTable = dataSchema;
if (dataSchemaFromEmptyDataTable == null) {
dataSchemaFromEmptyDataTable = dataSchema;
}
iterator.remove();
} else {
if (schemaOfNonEmptyTable == null) {
schemaOfNonEmptyTable = dataSchema;
if (dataSchemaFromNonEmptyDataTable == null) {
dataSchemaFromNonEmptyDataTable = dataSchema;
} else {
// Remove data tables with conflicting data schema.
// NOTE: Only compare the column data types, since the column names (string representation of expression)
// can change across different versions.
if (!Arrays.equals(dataSchema.getColumnDataTypes(), schemaOfNonEmptyTable.getColumnDataTypes())) {
serversWithConflictingSchema.add(entry.getKey());
if (!Arrays.equals(dataSchema.getColumnDataTypes(), dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
serversWithConflictingDataSchema.add(entry.getKey());
iterator.remove();
}
}
Expand All @@ -121,51 +118,52 @@ public BrokerResponseNative reduceOnDataTable(
String rawTableName = TableNameBuilder.extractRawTableName(tableName);

// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, response, brokerMetrics);
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);

// Report the servers with conflicting data schema.
if (!serversWithConflictingSchema.isEmpty()) {
if (!serversWithConflictingDataSchema.isEmpty()) {
String errorMessage = QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
+ " from servers: " + serversWithConflictingSchema + " got dropped due to data schema inconsistency.";
+ " from servers: " + serversWithConflictingDataSchema + " got dropped due to data schema inconsistency.";
LOGGER.warn(errorMessage);
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1);
response.addException(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
brokerResponseNative.addException(
new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
}

// NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
// response with metadata only.
DataSchema cachedDataSchema =
schemaOfNonEmptyTable != null ? schemaOfNonEmptyTable : schemaOfEmptyTable;
dataSchemaFromNonEmptyDataTable != null ? dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
if (cachedDataSchema == null) {
return response;
return brokerResponseNative;
}

Integer minGroupTrimSizeOpt = null;
Integer groupTrimThresholdOpt = null;
Integer minInitialIndexedTableCapOpt = null;
QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext);

Integer minGroupTrimSizeQueryOption = null;
Integer groupTrimThresholdQueryOption = null;
Integer minInitialIndexedTableCapacityQueryOption = null;
if (queryOptions != null) {
minGroupTrimSizeOpt = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdOpt = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
minInitialIndexedTableCapOpt = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
}

int minGroupTrimSize = minGroupTrimSizeOpt != null ? minGroupTrimSizeOpt : _minGroupTrimSize;
int groupTrimThreshold = groupTrimThresholdOpt != null ? groupTrimThresholdOpt : _groupByTrimThreshold;
int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
int groupTrimThreshold =
groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;
int minInitialIndexedTableCapacity =
minInitialIndexedTableCapOpt != null ? minInitialIndexedTableCapOpt : _minInitialIndexedTableCapacity;
minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption
: _minInitialIndexedTableCapacity;

QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
try {
DataTableReducer reducer = ResultReducerFactory.getResultReducer(serverQueryContext);
reducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, response,
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity), brokerMetrics);
} catch (EarlyTerminationException e) {
response.addException(
brokerResponseNative.addException(
new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
}

QueryContext queryContext;
if (brokerRequest == serverBrokerRequest) {
queryContext = serverQueryContext;
Expand All @@ -176,13 +174,13 @@ public BrokerResponseNative reduceOnDataTable(
throw new BadQueryRequestException("Nested query is not supported without gapfill");
}
BaseGapfillProcessor gapfillProcessor = GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(response);
gapfillProcessor.process(brokerResponseNative);
}

if (!serverQueryContext.isExplain()) {
updateAlias(queryContext, response);
updateAlias(queryContext, brokerResponseNative);
}
return response;
return brokerResponseNative;
}

public void shutDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ public void assertResultAndPlan(String option, String query, String expectedResu
JsonNode result = postV2Query(sql);
JsonNode plan = postV2Query(option + " set explainAskingServers=true; explain plan for " + query);

Assert.assertEquals(toResultStr(result), new CharSeq(expectedResult));
Assert.assertEquals(toExplainStr(plan), new CharSeq(expectedPlan));
Assert.assertEquals(toResultStr(result), expectedResult);
Assert.assertEquals(toExplainStr(plan), expectedPlan);
}

@Test
Expand Down Expand Up @@ -488,11 +488,11 @@ private void assertNumGroupsLimitException(String query)
throws Exception {
JsonNode result = postV2Query(query);

CharSeq errorMessage = toResultStr(result);
String errorMessage = toResultStr(result);

Assert.assertTrue(errorMessage.startsWith("QueryExecutionError:\n"
+ "Received error query execution result block: {1000=NUM_GROUPS_LIMIT has been reached at "),
errorMessage.toString());
errorMessage);
}

// for debug only
Expand All @@ -511,9 +511,9 @@ private JsonNode postV2Query(String query)
getExtraQueryProperties());
}

private static @NotNull CharSeq toResultStr(JsonNode mainNode) {
private static @NotNull String toResultStr(JsonNode mainNode) {
if (mainNode == null) {
return new CharSeq(new StringBuilder("null"));
return "null";
}
JsonNode node = mainNode.get(RESULT_TABLE);
if (node == null) {
Expand All @@ -522,9 +522,9 @@ private JsonNode postV2Query(String query)
return toString(node);
}

private static @NotNull CharSeq toExplainStr(JsonNode mainNode) {
private static @NotNull String toExplainStr(JsonNode mainNode) {
if (mainNode == null) {
return new CharSeq(new StringBuilder("null"));
return "null";
}
JsonNode node = mainNode.get(RESULT_TABLE);
if (node == null) {
Expand All @@ -533,101 +533,15 @@ private JsonNode postV2Query(String query)
return toExplainString(node);
}

static class CharSeq implements CharSequence {
private final StringBuilder _sb;

CharSeq(StringBuilder sb) {
_sb = sb;
}

CharSeq(String s) {
_sb = new StringBuilder(s);
}

@Override
public int length() {
return _sb.length();
}

@Override
public char charAt(int index) {
return _sb.charAt(index);
}

public boolean startsWith(CharSequence cs) {
if (cs.length() > _sb.length()) {
return false;
}

for (int i = 0, len = cs.length(); i < len; i++) {
if (_sb.charAt(i) != cs.charAt(i)) {
return false;
}
}

return true;
}

@Override
public @NotNull CharSequence subSequence(int start, int end) {
return new CharSeq(_sb.substring(start, end));
}

@NotNull
@Override
public String toString() {
return _sb.toString();
}

@Override
public int hashCode() {
int hc = 0;
for (int i = 0, len = _sb.length(); i < len; i++) {
hc = 31 * hc + _sb.charAt(i);
}
return hc;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (!(obj instanceof CharSequence)) {
return false;
}

CharSequence other = (CharSequence) obj;
if (_sb.length() != other.length()) {
return false;
}

for (int i = 0, len = _sb.length(); i < len; i++) {
if (_sb.charAt(i) != other.charAt(i)) {
return false;
}
}

return true;
}

CharSeq append(CharSequence other) {
_sb.append(other);
return this;
}
}

public static CharSeq toErrorString(JsonNode node) {
StringBuilder buf = new StringBuilder();
public static String toErrorString(JsonNode node) {
JsonNode jsonNode = node.get(0);
if (jsonNode != null) {
buf.append(jsonNode.get("message").textValue());
return jsonNode.get("message").textValue();
}
return new CharSeq(buf);
return "";
}

public static CharSeq toString(JsonNode node) {
public static String toString(JsonNode node) {
StringBuilder buf = new StringBuilder();
ArrayNode columnNames = (ArrayNode) node.get("dataSchema").get("columnNames");
ArrayNode columnTypes = (ArrayNode) node.get("dataSchema").get("columnDataTypes");
Expand Down Expand Up @@ -657,13 +571,11 @@ public static CharSeq toString(JsonNode node) {
}
}

return new CharSeq(buf);
return buf.toString();
}

public static CharSeq toExplainString(JsonNode node) {
StringBuilder buf = new StringBuilder();
buf.append(node.get("rows").get(0).get(1).textValue());
return new CharSeq(buf);
public static String toExplainString(JsonNode node) {
return node.get("rows").get(0).get(1).textValue();
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@


/**
* AggregateOperator is used to aggregate values over a (potentially empty) set of group by keys in V2/MSQE.
* AggregateOperator is used to aggregate values over a (potentially empty) set of group by keys in V2/MSQE.
* Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN]
* When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys.
*/
Expand All @@ -85,6 +85,7 @@ public class AggregateOperator extends MultiStageOperator {

// trimming - related members
private final int _groupTrimSize;
@Nullable
private final PriorityQueue<Object[]> _priorityQueue;
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved

public AggregateOperator(OpChainExecutionContext context, MultiStageOperator input, AggregateNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@ public class MultistageGroupByExecutor {
// because they use the zero based integer indexes to store results.
private final GroupIdGenerator _groupIdGenerator;

public MultistageGroupByExecutor(
int[] groupKeyIds,
AggregationFunction[] aggFunctions,
int[] filterArgIds,
int maxFilterArgId,
AggType aggType,
boolean leafReturnFinalResult,
DataSchema resultSchema,
Map<String, String> opChainMetadata,
@Nullable PlanNode.NodeHint nodeHint) {
public MultistageGroupByExecutor(int[] groupKeyIds,
AggregationFunction[] aggFunctions,
int[] filterArgIds,
int maxFilterArgId,
AggType aggType,
boolean leafReturnFinalResult,
DataSchema resultSchema,
Map<String, String> opChainMetadata,
@Nullable PlanNode.NodeHint nodeHint) {
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved
_groupKeyIds = groupKeyIds;
_aggFunctions = aggFunctions;
_filterArgIds = filterArgIds;
Expand Down
Loading