Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add group by trimming to MSQE/V2 query engine #14727

Merged
merged 10 commits into from
Jan 14, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString);
}

@Nullable
public static Integer getGroupTrimSize(Map<String, String> queryOptions) {
String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE);
// NOTE: Non-positive value means turning off the intermediate level trim
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize);
}

@Nullable
public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) {
String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
Expand Down Expand Up @@ -268,6 +275,10 @@ public static Integer getMultiStageLeafLimit(Map<String, String> queryOptions) {
return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr);
}

public static boolean getErrorOnNumGroupsLimit(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ERROR_ON_NUM_GROUPS_LIMIT));
}

@Nullable
public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -244,6 +246,42 @@ public List<String> listSegments(String tableName, @Nullable String tableType, b
}
}

public Map<String, List<String>> getServersToSegmentsMap(String tableName, TableType tableType)
gortiz marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
String url = _controllerRequestURLBuilder.forServersToSegmentsMap(tableName, tableType.toString());
try {
SimpleHttpResponse resp =
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URI(url), _headers));
JsonNode jsonNode = JsonUtils.stringToJsonNode(resp.getResponse());
if (jsonNode == null || jsonNode.get(0) == null) {
return Collections.emptyMap();
}

JsonNode serversMap = jsonNode.get(0).get("serverToSegmentsMap");
if (serversMap == null) {
return Collections.emptyMap();
}

HashMap<String, List<String>> result = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = serversMap.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
List<String> segments = new ArrayList<>();

ArrayNode value = (ArrayNode) field.getValue();
for (int i = 0, len = value.size(); i < len; i++) {
segments.add(value.get(i).toString());
}

result.put(field.getKey(), segments);
}

return result;
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
}

public void deleteSegment(String tableName, String segmentName)
throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,11 @@ public long getTableSize(String tableName)
return getControllerRequestClient().getTableSize(tableName);
}

public Map<String, List<String>> getTableServersToSegmentsMap(String tableName, TableType tableType)
throws IOException {
return getControllerRequestClient().getServersToSegmentsMap(tableName, tableType);
}

public String reloadOfflineTable(String tableName)
throws IOException {
return reloadOfflineTable(tableName, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


/**
* The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment.
* The <code>AggregationOperator</code> class implements keyless aggregation query on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class AggregationOperator extends BaseOperator<AggregationResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@


/**
* The <code>GroupByOperator</code> class provides the operator for group-by query on a single segment.
* The <code>GroupByOperator</code> class implements keyed aggregation on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class GroupByOperator extends BaseOperator<GroupByResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@


/**
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments.
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments in
* V1/SSQE.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class CombinePlanNode implements PlanNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;

// By default, group trimming in AggregateOperator is disabled
public static final int DEFAULT_GROUP_TRIM_SIZE = -1;

// Instance config key for minimum segment-level group trim size
// Set as pinot.server.query.executor.min.segment.group.trim.size
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = "min.segment.group.trim.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* integer raw keys and map them onto contiguous group ids. (INT_MAP_BASED)
* </li>
* <li>
* If the maximum number of possible group keys cannot fit into than integer, but still fit into long, generate long
* If the maximum number of possible group keys cannot fit into integer, but still fit into long, generate long
* raw keys and map them onto contiguous group ids. (LONG_MAP_BASED)
* </li>
* <li>
Expand Down Expand Up @@ -105,8 +105,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
assert numGroupsLimit >= arrayBasedThreshold;

_groupByExpressions = groupByExpressions;
_numGroupByExpressions = groupByExpressions.length;

Expand Down Expand Up @@ -173,7 +171,9 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
_rawKeyHolder = new LongMapBasedHolder(groupIdMap);
} else {
_globalGroupIdUpperBound = Math.min((int) cardinalityProduct, numGroupsLimit);
if (cardinalityProduct > arrayBasedThreshold) {
// arrayBaseHolder fails with ArrayIndexOutOfBoundsException if numGroupsLimit < cardinalityProduct
// because array doesn't fit all (potentially unsorted) values
if (cardinalityProduct > arrayBasedThreshold || numGroupsLimit < cardinalityProduct) {
// IntMapBasedHolder
IntGroupIdMap groupIdMap = THREAD_LOCAL_INT_MAP.get();
groupIdMap.clearAndTrim();
Expand Down Expand Up @@ -281,6 +281,7 @@ private interface RawKeyHolder {
int getNumKeys();
}

// This holder works only if it can fit all results, otherwise it fails on AIOOBE or produces too many group keys
private class ArrayBasedHolder implements RawKeyHolder {
private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
private int _numKeys = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ public BrokerReduceService(PinotConfiguration config) {
super(config);
}

public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap, long reduceTimeOutMs, BrokerMetrics brokerMetrics) {
public BrokerResponseNative reduceOnDataTable(
BrokerRequest brokerRequest,
BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap,
long reduceTimeOutMs,
BrokerMetrics brokerMetrics) {
if (dataTableMap.isEmpty()) {
// Empty response.
return BrokerResponseNative.empty();
Expand All @@ -70,12 +74,12 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));

ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
BrokerResponseNative response = new BrokerResponseNative();

// Cache a data schema from data tables (try to cache one with data rows associated with it).
DataSchema dataSchemaFromEmptyDataTable = null;
DataSchema dataSchemaFromNonEmptyDataTable = null;
List<ServerRoutingInstance> serversWithConflictingDataSchema = new ArrayList<>();
DataSchema schemaOfEmptyTable = null;
DataSchema schemaOfNonEmptyTable = null;
List<ServerRoutingInstance> serversWithConflictingSchema = new ArrayList<>();
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved

// Process server response metadata.
Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
Expand All @@ -93,19 +97,19 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
} else {
// Try to cache a data table with data rows inside, or cache one with data schema inside.
if (dataTable.getNumberOfRows() == 0) {
if (dataSchemaFromEmptyDataTable == null) {
dataSchemaFromEmptyDataTable = dataSchema;
if (schemaOfEmptyTable == null) {
schemaOfEmptyTable = dataSchema;
}
iterator.remove();
} else {
if (dataSchemaFromNonEmptyDataTable == null) {
dataSchemaFromNonEmptyDataTable = dataSchema;
if (schemaOfNonEmptyTable == null) {
schemaOfNonEmptyTable = dataSchema;
} else {
// Remove data tables with conflicting data schema.
// NOTE: Only compare the column data types, since the column names (string representation of expression)
// can change across different versions.
if (!Arrays.equals(dataSchema.getColumnDataTypes(), dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
serversWithConflictingDataSchema.add(entry.getKey());
if (!Arrays.equals(dataSchema.getColumnDataTypes(), schemaOfNonEmptyTable.getColumnDataTypes())) {
serversWithConflictingSchema.add(entry.getKey());
iterator.remove();
}
}
Expand All @@ -117,52 +121,51 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
String rawTableName = TableNameBuilder.extractRawTableName(tableName);

// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
aggregator.setStats(rawTableName, response, brokerMetrics);

// Report the servers with conflicting data schema.
if (!serversWithConflictingDataSchema.isEmpty()) {
if (!serversWithConflictingSchema.isEmpty()) {
String errorMessage = QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
+ " from servers: " + serversWithConflictingDataSchema + " got dropped due to data schema inconsistency.";
+ " from servers: " + serversWithConflictingSchema + " got dropped due to data schema inconsistency.";
LOGGER.warn(errorMessage);
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1);
brokerResponseNative.addException(
new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
response.addException(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
}

// NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
// response with metadata only.
DataSchema cachedDataSchema =
dataSchemaFromNonEmptyDataTable != null ? dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
schemaOfNonEmptyTable != null ? schemaOfNonEmptyTable : schemaOfEmptyTable;
if (cachedDataSchema == null) {
return brokerResponseNative;
return response;
}

QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext);
Integer minGroupTrimSizeOpt = null;
Integer groupTrimThresholdOpt = null;
Integer minInitialIndexedTableCapOpt = null;

Integer minGroupTrimSizeQueryOption = null;
Integer groupTrimThresholdQueryOption = null;
Integer minInitialIndexedTableCapacityQueryOption = null;
if (queryOptions != null) {
minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
minGroupTrimSizeOpt = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdOpt = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
minInitialIndexedTableCapOpt = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
}
int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
int groupTrimThreshold =
groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;

int minGroupTrimSize = minGroupTrimSizeOpt != null ? minGroupTrimSizeOpt : _minGroupTrimSize;
int groupTrimThreshold = groupTrimThresholdOpt != null ? groupTrimThresholdOpt : _groupByTrimThreshold;
int minInitialIndexedTableCapacity =
minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption
: _minInitialIndexedTableCapacity;
minInitialIndexedTableCapOpt != null ? minInitialIndexedTableCapOpt : _minInitialIndexedTableCapacity;

QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
try {
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
DataTableReducer reducer = ResultReducerFactory.getResultReducer(serverQueryContext);
reducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, response,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity), brokerMetrics);
} catch (EarlyTerminationException e) {
brokerResponseNative.addException(
response.addException(
new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
}

QueryContext queryContext;
if (brokerRequest == serverBrokerRequest) {
queryContext = serverQueryContext;
Expand All @@ -173,13 +176,13 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
throw new BadQueryRequestException("Nested query is not supported without gapfill");
}
BaseGapfillProcessor gapfillProcessor = GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(brokerResponseNative);
gapfillProcessor.process(response);
}

if (!serverQueryContext.isExplain()) {
updateAlias(queryContext, brokerResponseNative);
updateAlias(queryContext, response);
}
return brokerResponseNative;
return response;
}

public void shutDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

/**
* Helper class to reduce data tables and set group by results into the BrokerResponseNative
* Used for key-less aggregations, e.g. select max(id), sum(quantity) from orders .
*/
@SuppressWarnings("rawtypes")
public class GroupByDataTableReducer implements DataTableReducer {
Expand Down Expand Up @@ -140,9 +141,12 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema,
* @param brokerMetrics broker metrics (meters)
* @throws TimeoutException If unable complete within timeout.
*/
private void reduceResult(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
BrokerMetrics brokerMetrics)
private void reduceResult(BrokerResponseNative brokerResponseNative,
DataSchema dataSchema,
Collection<DataTable> dataTables,
DataTableReducerContext reducerContext,
String rawTableName,
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved
BrokerMetrics brokerMetrics)
throws TimeoutException {
// NOTE: This step will modify the data schema and also return final aggregate results.
IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public FilterContext getFilter() {
}

/**
* Returns a list of expressions in the GROUP-BY clause, or {@code null} if there is no GROUP-BY clause.
* Returns a list of expressions in the GROUP-BY clause (aggregation keys), or {@code null} if there is no GROUP-BY
* clause.
*/
@Nullable
public List<ExpressionContext> getGroupByExpressions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,22 @@ public static QueryContext getQueryContext(PinotQuery pinotQuery) {
explainMode = ExplainMode.DESCRIPTION;
}

return new QueryContext.Builder().setTableName(tableName).setSubquery(subquery)
.setSelectExpressions(selectExpressions).setDistinct(distinct).setAliasList(aliasList).setFilter(filter)
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions()).setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode).build();
return new QueryContext.Builder()
.setTableName(tableName)
.setSubquery(subquery)
.setSelectExpressions(selectExpressions)
.setDistinct(distinct)
.setAliasList(aliasList)
.setFilter(filter)
.setGroupByExpressions(groupByExpressions)
.setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter)
.setLimit(pinotQuery.getLimit())
.setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions())
.setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode)
.build();
}

private static boolean isMultiStage(PinotQuery pinotQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public static IndexedTable createIndexedTableForCombineOperator(GroupByResultsBl
int limit = queryContext.getLimit();
boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
boolean hasHaving = queryContext.getHavingFilter() != null;
int minTrimSize = queryContext.getMinServerGroupTrimSize();
int minTrimSize =
queryContext.getMinServerGroupTrimSize(); // it's minBrokerGroupTrimSize in broker
int minInitialIndexedTableCapacity = queryContext.getMinInitialIndexedTableCapacity();

// Disable trim when min trim size is non-positive
Expand Down
Loading
Loading