Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic evaluation of GroupBy Initial Capacity #14001

Merged
merged 16 commits into from
Oct 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ public void getColumns(Set<String> columns) {
}
}

public void getPredicateColumns(Set<Predicate> predicateColumns) {
if (_children != null) {
for (FilterContext child : _children) {
child.getPredicateColumns(predicateColumns);
}
}
else if (_predicate != null) {
predicateColumns.add(_predicate);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ public static Integer getMaxInitialResultHolderCapacity(Map<String, String> quer
return maxInitResultCap != null ? Integer.parseInt(maxInitResultCap) : null;
}

public static boolean optimizeMaxInitialResultHolderCapacityEnabled(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}

@Nullable
public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
String groupByTrimThreshold = queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.common.request.context.TimeSeriesContext;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
Expand Down Expand Up @@ -351,9 +353,56 @@ public void setMaxExecutionThreads(int maxExecutionThreads) {
}

public int getMaxInitialResultHolderCapacity() {
if (QueryOptionsUtils.optimizeMaxInitialResultHolderCapacityEnabled(_queryOptions)) {
return getOptimizedMaxInitialResultHolderCapacity();
}
return _maxInitialResultHolderCapacity;
}

// TODO: Improve this to use segment level info to optimize the capacity
// Optimization to right-size the initial result holder capacity for group-by queries if they exist in the filter.
// If any one group-by expression is not in the filter, we return the _maxInitialResultHolderCapacity.
public int getOptimizedMaxInitialResultHolderCapacity() {
if (getFilter() == null) {
return _maxInitialResultHolderCapacity;
}

assert getGroupByExpressions() != null;

Set<Predicate> predicateColumns = new HashSet<>();
getFilter().getPredicateColumns(predicateColumns);

// Map to store the size of the predicates
Map<ExpressionContext, Integer> predicateSizeMap = new HashMap<>();

// Collect IN and EQ predicates and store their sizes
for (Predicate predicate : predicateColumns) {
if (predicate.getType() == Predicate.Type.IN || predicate.getType() == Predicate.Type.EQ) {
ExpressionContext lhs = predicate.getLhs();
int size = (predicate.getType() == Predicate.Type.IN)
? ((InPredicate) predicate).getValues().size()
: 1;
predicateSizeMap.merge(lhs, size, Integer::sum);
}
}

int crossProductCapacity = 1;
for (ExpressionContext expression : getGroupByExpressions()) {
Integer size = predicateSizeMap.get(expression);

if (size == null) {
// No matching predicate for a group-by expression, return the default capacity
return _maxInitialResultHolderCapacity;
}
crossProductCapacity *= size;
if (crossProductCapacity > _maxInitialResultHolderCapacity) {
return _maxInitialResultHolderCapacity;
}
}
return crossProductCapacity;
}


public void setMaxInitialResultHolderCapacity(int maxInitialResultHolderCapacity) {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.aggregation.groupby;

import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


public class GroupByResultHolderTest {

@Test(dataProvider = "groupByResultHolderCapacityDataProvider")
public void testGetGroupByResultHolderCapacity(String query, Integer expectedCapacity,
boolean shouldOptimizeCapacity) {
if (shouldOptimizeCapacity) {
query = query + "SET optimizeMaxInitialResultHolderCapacity=true";
}
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
Assert.assertEquals(queryContext.getMaxInitialResultHolderCapacity(), expectedCapacity);
}

@DataProvider(name = "groupByResultHolderCapacityDataProvider")
public Object[][] groupByResultHolderCapacityDataProvider() {
return new Object[][]{
// Single IN predicate
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30, 40, 50) GROUP BY column1"
+ " LIMIT 10;", 5, true},
// Multiple IN predicates but only one used in group-by
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30) AND column2 IN (100, 200)"
+ " GROUP BY column1 LIMIT 10;", 3, true},
// Multiple IN predicates used in group-by
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30) AND column3 IN (40, 50)"
+ " GROUP BY column1, column3 LIMIT 10;", 6, true},
// Single EQ predicate
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 = 10 GROUP BY column1 LIMIT 10;", 1, true},
// Multiple EQ predicates but only one used in group-by
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 = 10 AND column2 = 100 GROUP BY column1"
+ " LIMIT 10;", 1, true},
// Mixed predicates
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30) AND column3 = 40"
+ " GROUP BY column1, column3 LIMIT 10;", 3, true},
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 = 10 AND column3 IN (40, 50)"
+ " GROUP BY column1, column3 LIMIT 10;", 2, true},
// Multiple IN Predicate columns with same column name and different values
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30) AND column1 IN (40, 50)"
+ " OR column2 IN (60, 70) GROUP BY column1, column2 LIMIT 10;", 10, true},
// Multiple EQ Predicate columns with same column name
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 = 10 OR column1 = 20"
+ " GROUP BY column1 LIMIT 10;", 2, true},
// No filter
{"SELECT COUNT(column1), MAX(column1) FROM testTable GROUP BY column1 LIMIT 10;",
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, true},
// No matching filter EQ predicate in group-by expression
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 = 10 GROUP BY column2 LIMIT 10;",
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, true},
// No matching filter IN predicate in group-by expression
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30) GROUP BY column2 LIMIT 10;",
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, true},
// Only one matching filter predicate in group-by expression
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30) GROUP BY column1, column2"
+ " LIMIT 10;", InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, true},
// Exceeding max size limit
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (1, 2, 3, 4, 5)"
+ " AND column2 IN (6, 7, 8, 9, 10) AND column3 IN (11, 12, 13, 14, 15)"
+ " AND column4 IN (16, 17, 18, 19, 20)"
+ " AND column5 IN (21, 22, 23, 24, 25)"
+ " AND column6 IN (26, 27, 28, 29, 30)"
+ " GROUP BY column1, column2, column3, column4, column5, column6;",
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, true},
// Disable optimization
{"SELECT COUNT(column1), MAX(column1) FROM testTable WHERE column1 IN (10, 20, 30, 40, 50) GROUP BY column1"
+ " LIMIT 10;", InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, false},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ public static class QueryOptionKey {
// executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS
// fashion with limited compute.
public static final String IS_SECONDARY_WORKLOAD = "isSecondaryWorkload";

// When set to true, the max initial result holder capacity will be optimized based on the query. Rather than
// using the default value. This is best-effort for now and returns the default value if the optimization is
// not possible.
public static final String OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"optimizeMaxInitialResultHolderCapacity";
}

public static class QueryOptionValue {
Expand Down
Loading