Skip to content

Commit

Permalink
Limit GroupByCombineOperator to use 2 * numCores threads instead of c…
Browse files Browse the repository at this point in the history
…reating one task per operator (#14843)
  • Loading branch information
yashmayya authored Jan 21, 2025
1 parent d819d8a commit d0d8b66
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
Expand All @@ -47,8 +48,6 @@

/**
* Combine operator for group-by queries.
* TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using
* all threads
*/
@SuppressWarnings("rawtypes")
public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> {
Expand Down Expand Up @@ -80,12 +79,13 @@ public GroupByCombineOperator(List<Operator> operators, QueryContext queryContex
}

/**
* For group-by queries, when maxExecutionThreads is not explicitly configured, create one task per operator.
* For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks
* as the default number of query worker threads (or the number of operators / segments if that's lower).
*/
private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) {
int maxExecutionThreads = queryContext.getMaxExecutionThreads();
if (maxExecutionThreads <= 0) {
queryContext.setMaxExecutionThreads(numOperators);
queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
}
return queryContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,7 +34,6 @@
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
Expand Down Expand Up @@ -81,8 +79,7 @@ public static void main(String[] args)

private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "FilteredAggregationsTest");
private static final String TABLE_NAME = "MyTable";
private static final String FIRST_SEGMENT_NAME = "firstTestSegment";
private static final String SECOND_SEGMENT_NAME = "secondTestSegment";
private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d";
private static final String INT_COL_NAME = "INT_COL";
private static final String SORTED_COL_NAME = "SORTED_COL";
private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
Expand All @@ -103,9 +100,9 @@ public static void main(String[] args)
.setStarTreeIndexConfigs(
Collections.singletonList(
new StarTreeIndexConfig(List.of(SORTED_COL_NAME, INT_COL_NAME), null,
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.SUM, RAW_INT_COL_NAME).toColumnName()),
null, Integer.MAX_VALUE))).build();
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.SUM, RAW_INT_COL_NAME).toColumnName()),
null, Integer.MAX_VALUE))).build();

//@formatter:off
private static final Schema SCHEMA = new Schema.SchemaBuilder()
Expand Down Expand Up @@ -199,6 +196,8 @@ public static void main(String[] args)
+ "FromDateTime(dateTimeConvert(TSTMP_COL, '1:MILLISECONDS:EPOCH', '1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss"
+ ".SSSZ tz(CET)', '1:DAYS'), 'yyyy-MM-dd HH:mm:ss.SSSZ') = 120000000";

@Param({"1", "2", "10", "50"})
private int _numSegments;
@Param("1500000")
private int _numRows;
@Param({"EXP(0.001)", "EXP(0.5)", "EXP(0.999)"})
Expand All @@ -222,16 +221,14 @@ public void setUp()
_supplier = Distribution.createSupplier(42, _scenario);
FileUtils.deleteQuietly(INDEX_DIR);

buildSegment(FIRST_SEGMENT_NAME);
buildSegment(SECOND_SEGMENT_NAME);

_indexSegments = new ArrayList<>();
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
ImmutableSegment firstImmutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME), indexLoadingConfig);
ImmutableSegment secondImmutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, SECOND_SEGMENT_NAME), indexLoadingConfig);
_indexSegment = firstImmutableSegment;
_indexSegments = Arrays.asList(firstImmutableSegment, secondImmutableSegment);
for (int i = 0; i < _numSegments; i++) {
buildSegment(String.format(SEGMENT_NAME_TEMPLATE, i));
_indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, String.format(SEGMENT_NAME_TEMPLATE, i)),
indexLoadingConfig));
}
_indexSegment = _indexSegments.get(0);
}

@TearDown
Expand Down

0 comments on commit d0d8b66

Please sign in to comment.