diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index aeef763ad5a1..633fb7d5e6f8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -39,6 +39,7 @@ import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; @@ -47,8 +48,6 @@ /** * Combine operator for group-by queries. - * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using - * all threads */ @SuppressWarnings("rawtypes") public class GroupByCombineOperator extends BaseSingleBlockCombineOperator { @@ -80,12 +79,13 @@ public GroupByCombineOperator(List operators, QueryContext queryContex } /** - * For group-by queries, when maxExecutionThreads is not explicitly configured, create one task per operator. + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks + * as the default number of query worker threads (or the number of operators / segments if that's lower). */ private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { int maxExecutionThreads = queryContext.getMaxExecutionThreads(); if (maxExecutionThreads <= 0) { - queryContext.setMaxExecutionThreads(numOperators); + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); } return queryContext; }