Skip to content

Commit

Permalink
Add group_trim_size query option and executor setting.
Browse files Browse the repository at this point in the history
  • Loading branch information
bziobrowski committed Dec 31, 2024
1 parent a1e3675 commit c75da16
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString);
}

@Nullable
public static Integer getGroupTrimSize(Map<String, String> queryOptions) {
String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE);
// NOTE: Non-positive value means turning off the intermediate level trim
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize);
}

@Nullable
public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) {
String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;

// By default, group trimming in AggregateOperator is disabled
public static final int DEFAULT_GROUP_TRIM_SIZE = -1;

// Instance config key for minimum segment-level group trim size
// Set as pinot.server.query.executor.min.segment.group.trim.size
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = "min.segment.group.trim.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public static class AggregateOptions {
public static final String IS_PARTITIONED_BY_GROUP_BY_KEYS = "is_partitioned_by_group_by_keys";
public static final String IS_LEAF_RETURN_FINAL_RESULT = "is_leaf_return_final_result";
public static final String IS_SKIP_LEAF_STAGE_GROUP_BY = "is_skip_leaf_stage_group_by";
public static final String IS_ENABLE_GROUP_TRIM = "is_enable_group_trim";

/** Enables trimming of aggregation intermediate results by pushing down order by and limit to leaf stage. */
public static final String ENABLE_GROUP_TRIM = "is_enable_group_trim";
/** Enables trimming of aggregation intermediate results by pushing down order by and limit,
* down to leaf stage if possible. */
public static final String IS_ENABLE_GROUP_TRIM = "is_enable_group_trim";

/** Throw an exception on reaching num_groups_limit instead of just setting a flag. */
public static final String ERROR_ON_NUM_GROUPS_LIMIT = "error_on_num_groups_limit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public class QueryRunner {
// Group-by settings
@Nullable
private Integer _numGroupsLimit;
@Nullable
private Integer _groupTrimSize;

@Nullable
private Integer _maxInitialResultHolderCapacity;
@Nullable
Expand Down Expand Up @@ -141,16 +144,23 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
// TODO: Consider using separate config for intermediate stage and leaf stage
String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
_numGroupsLimit = numGroupsLimitStr != null ? Integer.parseInt(numGroupsLimitStr) : null;

String groupTrimSizeStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_GROUP_TRIM_SIZE);
_groupTrimSize = groupTrimSizeStr != null ? Integer.parseInt(groupTrimSizeStr) : null;

String maxInitialGroupHolderCapacity =
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_maxInitialResultHolderCapacity =
maxInitialGroupHolderCapacity != null ? Integer.parseInt(maxInitialGroupHolderCapacity) : null;

String minInitialIndexedTableCapacityStr =
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
_minInitialIndexedTableCapacity =
minInitialIndexedTableCapacityStr != null ? Integer.parseInt(minInitialIndexedTableCapacityStr) : null;

String maxRowsInJoinStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
_maxRowsInJoin = maxRowsInJoinStr != null ? Integer.parseInt(maxRowsInJoinStr) : null;

String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
_joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;

Expand Down Expand Up @@ -337,6 +347,14 @@ private Map<String, String> consolidateMetadata(Map<String, String> customProper
opChainMetadata.put(QueryOptionKey.NUM_GROUPS_LIMIT, Integer.toString(numGroupsLimit));
}

Integer groupTrimSize = QueryOptionsUtils.getGroupTrimSize(opChainMetadata);
if (groupTrimSize == null) {
groupTrimSize = _groupTrimSize;
}
if (groupTrimSize != null) {
opChainMetadata.put(QueryOptionKey.GROUP_TRIM_SIZE, Integer.toString(groupTrimSize));
}

Integer maxInitialResultHolderCapacity = QueryOptionsUtils.getMaxInitialResultHolderCapacity(opChainMetadata);
if (maxInitialResultHolderCapacity == null) {
maxInitialResultHolderCapacity = _maxInitialResultHolderCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -39,6 +40,7 @@
import org.apache.pinot.core.operator.docvalsets.FilteredDataBlockValSet;
import org.apache.pinot.core.operator.docvalsets.FilteredRowBasedBlockValSet;
import org.apache.pinot.core.operator.docvalsets.RowBasedBlockValSet;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
import org.apache.pinot.core.query.aggregation.function.CountAggregationFunction;
Expand Down Expand Up @@ -106,7 +108,7 @@ public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inp
List<Integer> groupKeys = node.getGroupKeys();

//process order trimming hint
int groupTrimSize = getGroupTrimSize(node.getNodeHint());
int groupTrimSize = getGroupTrimSize(node.getNodeHint(), context.getOpChainMetadata());

if (groupTrimSize > -1) {
// limit is set to 0 if not pushed
Expand All @@ -118,7 +120,7 @@ public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inp
_priorityQueue = null;
} else {
List<RelFieldCollation> collations = node.getCollations();
if (collations != null && collations.size() > 0) {
if (collations != null && !collations.isEmpty()) {
// order needs to be reversed so that peek() can be used to compare with each output row
_priorityQueue =
new PriorityQueue<>(groupTrimSize, new SortUtils.SortComparator(_resultSchema, collations, true));
Expand Down Expand Up @@ -149,7 +151,7 @@ public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inp
}
}

private int getGroupTrimSize(PlanNode.NodeHint nodeHint) {
private int getGroupTrimSize(PlanNode.NodeHint nodeHint, Map<String, String> opChainMetadata) {
if (nodeHint != null) {
Map<String, String> options = nodeHint.getHintOptions().get(PinotHintOptions.AGGREGATE_HINT_OPTIONS);
if (options != null) {
Expand All @@ -160,7 +162,8 @@ private int getGroupTrimSize(PlanNode.NodeHint nodeHint) {
}
}

return -1;
Integer groupTrimSize = QueryOptionsUtils.getGroupTrimSize(opChainMetadata);
return groupTrimSize != null ? groupTrimSize : InstancePlanMakerImplV2.DEFAULT_GROUP_TRIM_SIZE;
}

@Override
Expand Down Expand Up @@ -467,4 +470,9 @@ private boolean getErrorOnNumGroupsLimit(Map<String, String> opChainMetadata, Pl

return QueryOptionsUtils.getErrorOnNumGroupsLimit(opChainMetadata);
}

@VisibleForTesting
int getGroupTrimSize() {
return _groupTrimSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockTestUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.mockito.Mock;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -265,6 +268,50 @@ public void shouldHandleGroupLimitExceed() {
"num groups limit should be reached");
}

@Test
public void testGroupTrimSizeIsDisabledByDefault() {
PlanNode.NodeHint nodeHint = null;
OpChainExecutionContext context = OperatorTestUtil.getTracingContext();

Assert.assertEquals(getAggregateOperator(context, nodeHint, 10).getGroupTrimSize(), Integer.MAX_VALUE);
Assert.assertEquals(getAggregateOperator(context, nodeHint, 0).getGroupTrimSize(), Integer.MAX_VALUE);
}

@Test
public void testGroupTrimSizeDependsOnContextValue() {
PlanNode.NodeHint nodeHint = null;
OpChainExecutionContext context =
OperatorTestUtil.getContext(Map.of(CommonConstants.Broker.Request.QueryOptionKey.GROUP_TRIM_SIZE, "100"));

AggregateOperator operator = getAggregateOperator(context, nodeHint, 5);

Assert.assertEquals(operator.getGroupTrimSize(), 100);
}

@Test
public void testGroupTrimHintOverridesContextValue() {
PlanNode.NodeHint nodeHint = new PlanNode.NodeHint(Map.of(PinotHintOptions.AGGREGATE_HINT_OPTIONS,
Map.of(PinotHintOptions.AggregateOptions.GROUP_TRIM_SIZE, "30")));

OpChainExecutionContext context =
OperatorTestUtil.getContext(Map.of(CommonConstants.Broker.Request.QueryOptionKey.GROUP_TRIM_SIZE, "100"));

AggregateOperator operator = getAggregateOperator(context, nodeHint, 5);

Assert.assertEquals(operator.getGroupTrimSize(), 30);
}

private AggregateOperator getAggregateOperator(OpChainExecutionContext context, PlanNode.NodeHint nodeHint,
int limit) {
List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new RexExpression.InputRef(1)));
List<Integer> filterArgs = List.of(-1);
List<Integer> groupKeys = List.of(0);
DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE});
return new AggregateOperator(context, _input,
new AggregateNode(-1, resultSchema, nodeHint, List.of(), aggCalls, filterArgs, groupKeys, AggType.DIRECT,
false, null, limit));
}

private static RexExpression.FunctionCall getSum(RexExpression arg) {
return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.SUM.name(), List.of(arg));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public static OpChainExecutionContext getTracingContext() {
return getTracingContext(ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"));
}

public static OpChainExecutionContext getContext(Map<String, String> opChainMetadata) {
return getTracingContext(opChainMetadata);
}

public static OpChainExecutionContext getNoTracingContext() {
return getTracingContext(ImmutableMap.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,21 @@ public static class QueryOptionKey {
public static final String ROUTING_OPTIONS = "routingOptions";
public static final String USE_SCAN_REORDER_OPTIMIZATION = "useScanReorderOpt";
public static final String MAX_EXECUTION_THREADS = "maxExecutionThreads";

/** Number of groups AggregateOperator should limit result to after sorting.
* Trimming happens only when (sub)query contains order by and limit clause. */
public static final String GROUP_TRIM_SIZE = "groupTrimSize";

/** Number of groups GroupByOperator should limit result to after sorting.
* Trimming happens only when (sub)query contains order by clause. */
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "minSegmentGroupTrimSize";

/** Max number of groups GroupByCombineOperator (running at server) should return .*/
public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize";

/** Max number of groups GroupByDataTableReducer (running at broker) should return. */
public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize";

public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery";
public static final String USE_FIXED_REPLICA = "useFixedReplica";
public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
Expand Down Expand Up @@ -702,6 +714,8 @@ public static class Server {
public static final String CONFIG_OF_QUERY_EXECUTOR_TIMEOUT = "pinot.server.query.executor.timeout";
public static final String CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT =
"pinot.server.query.executor.num.groups.limit";
public static final String CONFIG_OF_QUERY_EXECUTOR_GROUP_TRIM_SIZE =
"pinot.server.query.executor.group.trim.size";
public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"pinot.server.query.executor.max.init.group.holder.capacity";
public static final String CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
Expand Down

0 comments on commit c75da16

Please sign in to comment.