diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index e29da5cc6d08..5f88a9691c0b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -213,6 +213,13 @@ public static Integer getMaxExecutionThreads(Map queryOptions) { return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString); } + @Nullable + public static Integer getGroupTrimSize(Map queryOptions) { + String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE); + // NOTE: Non-positive value means turning off the intermediate level trim + return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize); + } + @Nullable public static Integer getMinSegmentGroupTrimSize(Map queryOptions) { String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index cadce4bcf6d0..0475c62c1711 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -76,6 +76,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker { public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; + // By default, group trimming in AggregateOperator is disabled + public static final int DEFAULT_GROUP_TRIM_SIZE = -1; + // Instance config key for minimum segment-level group trim size // Set as pinot.server.query.executor.min.segment.group.trim.size public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = "min.segment.group.trim.size"; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java index 1ea364179574..d98b86a0f760 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java @@ -43,10 +43,10 @@ public static class AggregateOptions { public static final String IS_PARTITIONED_BY_GROUP_BY_KEYS = "is_partitioned_by_group_by_keys"; public static final String IS_LEAF_RETURN_FINAL_RESULT = "is_leaf_return_final_result"; public static final String IS_SKIP_LEAF_STAGE_GROUP_BY = "is_skip_leaf_stage_group_by"; - public static final String IS_ENABLE_GROUP_TRIM = "is_enable_group_trim"; - /** Enables trimming of aggregation intermediate results by pushing down order by and limit to leaf stage. */ - public static final String ENABLE_GROUP_TRIM = "is_enable_group_trim"; + /** Enables trimming of aggregation intermediate results by pushing down order by and limit, + * down to leaf stage if possible. */ + public static final String IS_ENABLE_GROUP_TRIM = "is_enable_group_trim"; /** Throw an exception on reaching num_groups_limit instead of just setting a flag. */ public static final String ERROR_ON_NUM_GROUPS_LIMIT = "error_on_num_groups_limit"; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index ac335a1674c4..1bb47a8f009d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -108,6 +108,9 @@ public class QueryRunner { // Group-by settings @Nullable private Integer _numGroupsLimit; + @Nullable + private Integer _groupTrimSize; + @Nullable private Integer _maxInitialResultHolderCapacity; @Nullable @@ -141,16 +144,23 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana // TODO: Consider using separate config for intermediate stage and leaf stage String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT); _numGroupsLimit = numGroupsLimitStr != null ? Integer.parseInt(numGroupsLimitStr) : null; + + String groupTrimSizeStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_GROUP_TRIM_SIZE); + _groupTrimSize = groupTrimSizeStr != null ? Integer.parseInt(groupTrimSizeStr) : null; + String maxInitialGroupHolderCapacity = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY); _maxInitialResultHolderCapacity = maxInitialGroupHolderCapacity != null ? Integer.parseInt(maxInitialGroupHolderCapacity) : null; + String minInitialIndexedTableCapacityStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY); _minInitialIndexedTableCapacity = minInitialIndexedTableCapacityStr != null ? Integer.parseInt(minInitialIndexedTableCapacityStr) : null; + String maxRowsInJoinStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN); _maxRowsInJoin = maxRowsInJoinStr != null ? Integer.parseInt(maxRowsInJoinStr) : null; + String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE); _joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null; @@ -337,6 +347,14 @@ private Map consolidateMetadata(Map customProper opChainMetadata.put(QueryOptionKey.NUM_GROUPS_LIMIT, Integer.toString(numGroupsLimit)); } + Integer groupTrimSize = QueryOptionsUtils.getGroupTrimSize(opChainMetadata); + if (groupTrimSize == null) { + groupTrimSize = _groupTrimSize; + } + if (groupTrimSize != null) { + opChainMetadata.put(QueryOptionKey.GROUP_TRIM_SIZE, Integer.toString(groupTrimSize)); + } + Integer maxInitialResultHolderCapacity = QueryOptionsUtils.getMaxInitialResultHolderCapacity(opChainMetadata); if (maxInitialResultHolderCapacity == null) { maxInitialResultHolderCapacity = _maxInitialResultHolderCapacity; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index c431e18de9ae..e8173534b200 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.runtime.operator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +40,7 @@ import org.apache.pinot.core.operator.docvalsets.FilteredDataBlockValSet; import org.apache.pinot.core.operator.docvalsets.FilteredRowBasedBlockValSet; import org.apache.pinot.core.operator.docvalsets.RowBasedBlockValSet; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory; import org.apache.pinot.core.query.aggregation.function.CountAggregationFunction; @@ -106,7 +108,7 @@ public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inp List groupKeys = node.getGroupKeys(); //process order trimming hint - int groupTrimSize = getGroupTrimSize(node.getNodeHint()); + int groupTrimSize = getGroupTrimSize(node.getNodeHint(), context.getOpChainMetadata()); if (groupTrimSize > -1) { // limit is set to 0 if not pushed @@ -118,7 +120,7 @@ public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inp _priorityQueue = null; } else { List collations = node.getCollations(); - if (collations != null && collations.size() > 0) { + if (collations != null && !collations.isEmpty()) { // order needs to be reversed so that peek() can be used to compare with each output row _priorityQueue = new PriorityQueue<>(groupTrimSize, new SortUtils.SortComparator(_resultSchema, collations, true)); @@ -149,7 +151,7 @@ public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inp } } - private int getGroupTrimSize(PlanNode.NodeHint nodeHint) { + private int getGroupTrimSize(PlanNode.NodeHint nodeHint, Map opChainMetadata) { if (nodeHint != null) { Map options = nodeHint.getHintOptions().get(PinotHintOptions.AGGREGATE_HINT_OPTIONS); if (options != null) { @@ -160,7 +162,8 @@ private int getGroupTrimSize(PlanNode.NodeHint nodeHint) { } } - return -1; + Integer groupTrimSize = QueryOptionsUtils.getGroupTrimSize(opChainMetadata); + return groupTrimSize != null ? groupTrimSize : InstancePlanMakerImplV2.DEFAULT_GROUP_TRIM_SIZE; } @Override @@ -467,4 +470,9 @@ private boolean getErrorOnNumGroupsLimit(Map opChainMetadata, Pl return QueryOptionsUtils.getErrorOnNumGroupsLimit(opChainMetadata); } + + @VisibleForTesting + int getGroupTrimSize() { + return _groupTrimSize; + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index b2e73f226a3a..56a83cb36e8b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -33,7 +33,10 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockTestUtils; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.spi.utils.CommonConstants; import org.mockito.Mock; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -265,6 +268,50 @@ public void shouldHandleGroupLimitExceed() { "num groups limit should be reached"); } + @Test + public void testGroupTrimSizeIsDisabledByDefault() { + PlanNode.NodeHint nodeHint = null; + OpChainExecutionContext context = OperatorTestUtil.getTracingContext(); + + Assert.assertEquals(getAggregateOperator(context, nodeHint, 10).getGroupTrimSize(), Integer.MAX_VALUE); + Assert.assertEquals(getAggregateOperator(context, nodeHint, 0).getGroupTrimSize(), Integer.MAX_VALUE); + } + + @Test + public void testGroupTrimSizeDependsOnContextValue() { + PlanNode.NodeHint nodeHint = null; + OpChainExecutionContext context = + OperatorTestUtil.getContext(Map.of(CommonConstants.Broker.Request.QueryOptionKey.GROUP_TRIM_SIZE, "100")); + + AggregateOperator operator = getAggregateOperator(context, nodeHint, 5); + + Assert.assertEquals(operator.getGroupTrimSize(), 100); + } + + @Test + public void testGroupTrimHintOverridesContextValue() { + PlanNode.NodeHint nodeHint = new PlanNode.NodeHint(Map.of(PinotHintOptions.AGGREGATE_HINT_OPTIONS, + Map.of(PinotHintOptions.AggregateOptions.GROUP_TRIM_SIZE, "30"))); + + OpChainExecutionContext context = + OperatorTestUtil.getContext(Map.of(CommonConstants.Broker.Request.QueryOptionKey.GROUP_TRIM_SIZE, "100")); + + AggregateOperator operator = getAggregateOperator(context, nodeHint, 5); + + Assert.assertEquals(operator.getGroupTrimSize(), 30); + } + + private AggregateOperator getAggregateOperator(OpChainExecutionContext context, PlanNode.NodeHint nodeHint, + int limit) { + List aggCalls = List.of(getSum(new RexExpression.InputRef(1))); + List filterArgs = List.of(-1); + List groupKeys = List.of(0); + DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE}); + return new AggregateOperator(context, _input, + new AggregateNode(-1, resultSchema, nodeHint, List.of(), aggCalls, filterArgs, groupKeys, AggType.DIRECT, + false, null, limit)); + } + private static RexExpression.FunctionCall getSum(RexExpression arg) { return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.SUM.name(), List.of(arg)); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index f279e5992b14..0d6317ab2d53 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -90,6 +90,10 @@ public static OpChainExecutionContext getTracingContext() { return getTracingContext(ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true")); } + public static OpChainExecutionContext getContext(Map opChainMetadata) { + return getTracingContext(opChainMetadata); + } + public static OpChainExecutionContext getNoTracingContext() { return getTracingContext(ImmutableMap.of()); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 76fd040e627e..93f5b03a0ceb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -409,9 +409,21 @@ public static class QueryOptionKey { public static final String ROUTING_OPTIONS = "routingOptions"; public static final String USE_SCAN_REORDER_OPTIMIZATION = "useScanReorderOpt"; public static final String MAX_EXECUTION_THREADS = "maxExecutionThreads"; + + /** Number of groups AggregateOperator should limit result to after sorting. + * Trimming happens only when (sub)query contains order by and limit clause. */ + public static final String GROUP_TRIM_SIZE = "groupTrimSize"; + + /** Number of groups GroupByOperator should limit result to after sorting. + * Trimming happens only when (sub)query contains order by clause. */ public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "minSegmentGroupTrimSize"; + + /** Max number of groups GroupByCombineOperator (running at server) should return .*/ public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize"; + + /** Max number of groups GroupByDataTableReducer (running at broker) should return. */ public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize"; + public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery"; public static final String USE_FIXED_REPLICA = "useFixedReplica"; public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose"; @@ -702,6 +714,8 @@ public static class Server { public static final String CONFIG_OF_QUERY_EXECUTOR_TIMEOUT = "pinot.server.query.executor.timeout"; public static final String CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT = "pinot.server.query.executor.num.groups.limit"; + public static final String CONFIG_OF_QUERY_EXECUTOR_GROUP_TRIM_SIZE = + "pinot.server.query.executor.group.trim.size"; public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "pinot.server.query.executor.max.init.group.holder.capacity"; public static final String CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY =