From d0d8b666accea5e5d94f22418f2a7c2e2a9a6c9d Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Wed, 22 Jan 2025 01:25:23 +0700 Subject: [PATCH] Limit GroupByCombineOperator to use 2 * numCores threads instead of creating one task per operator (#14843) --- .../combine/GroupByCombineOperator.java | 8 ++--- .../apache/pinot/perf/BenchmarkQueries.java | 29 +++++++++---------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index aeef763ad5a1..633fb7d5e6f8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -39,6 +39,7 @@ import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; @@ -47,8 +48,6 @@ /** * Combine operator for group-by queries. - * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using - * all threads */ @SuppressWarnings("rawtypes") public class GroupByCombineOperator extends BaseSingleBlockCombineOperator { @@ -80,12 +79,13 @@ public GroupByCombineOperator(List operators, QueryContext queryContex } /** - * For group-by queries, when maxExecutionThreads is not explicitly configured, create one task per operator. + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks + * as the default number of query worker threads (or the number of operators / segments if that's lower). */ private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { int maxExecutionThreads = queryContext.getMaxExecutionThreads(); if (maxExecutionThreads <= 0) { - queryContext.setMaxExecutionThreads(numOperators); + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); } return queryContext; } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java index 9fb36668877b..a249a176353a 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java @@ -20,7 +20,6 @@ import java.io.File; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -35,7 +34,6 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; @@ -81,8 +79,7 @@ public static void main(String[] args) private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "FilteredAggregationsTest"); private static final String TABLE_NAME = "MyTable"; - private static final String FIRST_SEGMENT_NAME = "firstTestSegment"; - private static final String SECOND_SEGMENT_NAME = "secondTestSegment"; + private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d"; private static final String INT_COL_NAME = "INT_COL"; private static final String SORTED_COL_NAME = "SORTED_COL"; private static final String RAW_INT_COL_NAME = "RAW_INT_COL"; @@ -103,9 +100,9 @@ public static void main(String[] args) .setStarTreeIndexConfigs( Collections.singletonList( new StarTreeIndexConfig(List.of(SORTED_COL_NAME, INT_COL_NAME), null, - Collections.singletonList( - new AggregationFunctionColumnPair(AggregationFunctionType.SUM, RAW_INT_COL_NAME).toColumnName()), - null, Integer.MAX_VALUE))).build(); + Collections.singletonList( + new AggregationFunctionColumnPair(AggregationFunctionType.SUM, RAW_INT_COL_NAME).toColumnName()), + null, Integer.MAX_VALUE))).build(); //@formatter:off private static final Schema SCHEMA = new Schema.SchemaBuilder() @@ -199,6 +196,8 @@ public static void main(String[] args) + "FromDateTime(dateTimeConvert(TSTMP_COL, '1:MILLISECONDS:EPOCH', '1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss" + ".SSSZ tz(CET)', '1:DAYS'), 'yyyy-MM-dd HH:mm:ss.SSSZ') = 120000000"; + @Param({"1", "2", "10", "50"}) + private int _numSegments; @Param("1500000") private int _numRows; @Param({"EXP(0.001)", "EXP(0.5)", "EXP(0.999)"}) @@ -222,16 +221,14 @@ public void setUp() _supplier = Distribution.createSupplier(42, _scenario); FileUtils.deleteQuietly(INDEX_DIR); - buildSegment(FIRST_SEGMENT_NAME); - buildSegment(SECOND_SEGMENT_NAME); - + _indexSegments = new ArrayList<>(); IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA); - ImmutableSegment firstImmutableSegment = - ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME), indexLoadingConfig); - ImmutableSegment secondImmutableSegment = - ImmutableSegmentLoader.load(new File(INDEX_DIR, SECOND_SEGMENT_NAME), indexLoadingConfig); - _indexSegment = firstImmutableSegment; - _indexSegments = Arrays.asList(firstImmutableSegment, secondImmutableSegment); + for (int i = 0; i < _numSegments; i++) { + buildSegment(String.format(SEGMENT_NAME_TEMPLATE, i)); + _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, String.format(SEGMENT_NAME_TEMPLATE, i)), + indexLoadingConfig)); + } + _indexSegment = _indexSegments.get(0); } @TearDown