Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit GroupByCombineOperator to use 2 * numCores threads instead of creating one task per operator #14843

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
Expand All @@ -47,8 +48,6 @@

/**
* Combine operator for group-by queries.
* TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using
* all threads
*/
@SuppressWarnings("rawtypes")
public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> {
Expand Down Expand Up @@ -80,12 +79,13 @@ public GroupByCombineOperator(List<Operator> operators, QueryContext queryContex
}

/**
* For group-by queries, when maxExecutionThreads is not explicitly configured, create one task per operator.
* For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks
* as the default number of query worker threads (or the number of operators / segments if that's lower).
*/
private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) {
int maxExecutionThreads = queryContext.getMaxExecutionThreads();
if (maxExecutionThreads <= 0) {
queryContext.setMaxExecutionThreads(numOperators);
queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
}
return queryContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,7 +34,6 @@
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
Expand Down Expand Up @@ -81,8 +79,7 @@ public static void main(String[] args)

private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "FilteredAggregationsTest");
private static final String TABLE_NAME = "MyTable";
private static final String FIRST_SEGMENT_NAME = "firstTestSegment";
private static final String SECOND_SEGMENT_NAME = "secondTestSegment";
private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d";
private static final String INT_COL_NAME = "INT_COL";
private static final String SORTED_COL_NAME = "SORTED_COL";
private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
Expand All @@ -103,9 +100,9 @@ public static void main(String[] args)
.setStarTreeIndexConfigs(
Collections.singletonList(
new StarTreeIndexConfig(List.of(SORTED_COL_NAME, INT_COL_NAME), null,
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.SUM, RAW_INT_COL_NAME).toColumnName()),
null, Integer.MAX_VALUE))).build();
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.SUM, RAW_INT_COL_NAME).toColumnName()),
null, Integer.MAX_VALUE))).build();

//@formatter:off
private static final Schema SCHEMA = new Schema.SchemaBuilder()
Expand Down Expand Up @@ -199,6 +196,8 @@ public static void main(String[] args)
+ "FromDateTime(dateTimeConvert(TSTMP_COL, '1:MILLISECONDS:EPOCH', '1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss"
+ ".SSSZ tz(CET)', '1:DAYS'), 'yyyy-MM-dd HH:mm:ss.SSSZ') = 120000000";

@Param({"1", "2", "10", "50"})
private int _numSegments;
@Param("1500000")
private int _numRows;
@Param({"EXP(0.001)", "EXP(0.5)", "EXP(0.999)"})
Expand All @@ -222,16 +221,14 @@ public void setUp()
_supplier = Distribution.createSupplier(42, _scenario);
FileUtils.deleteQuietly(INDEX_DIR);

buildSegment(FIRST_SEGMENT_NAME);
buildSegment(SECOND_SEGMENT_NAME);

_indexSegments = new ArrayList<>();
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
ImmutableSegment firstImmutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME), indexLoadingConfig);
ImmutableSegment secondImmutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, SECOND_SEGMENT_NAME), indexLoadingConfig);
_indexSegment = firstImmutableSegment;
_indexSegments = Arrays.asList(firstImmutableSegment, secondImmutableSegment);
for (int i = 0; i < _numSegments; i++) {
buildSegment(String.format(SEGMENT_NAME_TEMPLATE, i));
_indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, String.format(SEGMENT_NAME_TEMPLATE, i)),
indexLoadingConfig));
}
_indexSegment = _indexSegments.get(0);
}

@TearDown
Expand Down
Loading