Skip to content

Commit

Permalink
bounded checks
Browse files Browse the repository at this point in the history
  • Loading branch information
praveenc7 committed Oct 9, 2024
1 parent 51cc735 commit 5191b1e
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] a
}
} else {
_groupKeyGenerator = new DictionaryBasedGroupKeyGenerator(projectOperator, groupByExpressions, numGroupsLimit,
maxInitialResultHolderCapacity);
maxInitialResultHolderCapacity, queryContext.getFilter(), queryContext.getQueryOptions());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,18 @@
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
import org.apache.pinot.core.operator.ColumnContext;
Expand Down Expand Up @@ -97,9 +107,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {

private final int _globalGroupIdUpperBound;
private final RawKeyHolder _rawKeyHolder;
private final Map<ExpressionContext, Integer> _cardinalityMap;

public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold) {
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold,
@Nullable FilterContext filterContext, @Nullable Map<String, String> queryOptions) {
assert numGroupsLimit >= arrayBasedThreshold;

_groupByExpressions = groupByExpressions;
Expand All @@ -113,7 +125,7 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
// no need to intern dictionary values when there is only one group by expression because
// only one call will be made to the dictionary to extract each raw value.
_internedDictionaryValues = _numGroupByExpressions > 1 ? new Object[_numGroupByExpressions][] : null;

_cardinalityMap = new HashMap<>(_numGroupByExpressions);
long cardinalityProduct = 1L;
boolean longOverflow = false;
for (int i = 0; i < _numGroupByExpressions; i++) {
Expand All @@ -123,6 +135,7 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
assert _dictionaries[i] != null;
int cardinality = _dictionaries[i].length();
_cardinalities[i] = cardinality;
_cardinalityMap.put(groupByExpression, cardinality);
if (_internedDictionaryValues != null && cardinality < MAX_DICTIONARY_INTERN_TABLE_SIZE) {
_internedDictionaryValues[i] = new Object[cardinality];
}
Expand All @@ -135,6 +148,13 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
}
_isSingleValueColumn[i] = columnContext.isSingleValue();
}
if (queryOptions != null && QueryOptionsUtils.optimizeMaxInitialResultHolderCapacityEnabled(queryOptions)) {
Pair<Boolean, Long> optimizedResult = getOptimizedMaxInitialResultHolderCapacity(filterContext);
if (optimizedResult.getLeft() && optimizedResult.getRight() != null) {
longOverflow = false;
cardinalityProduct = Math.min(optimizedResult.getRight(), cardinalityProduct);
}
}
// TODO: Clear the holder after processing the query instead of before
if (longOverflow) {
// ArrayMapBasedHolder
Expand Down Expand Up @@ -171,6 +191,49 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
}
}

// Calculate the estimated result set size for a group-by query based on filter predicates and column cardinalities.
// If the size exceeds Long.MAX_VALUE, return an early overflow signal. Otherwise, return the product of cardinalities.
// Filters are considered by collecting IN and EQ predicates to refine the cardinality estimate.
// Returns a pair of boolean and long. The boolean indicates if the optimization is enabled, and
// the long is the estimated result set size if the optimization is enabled.
private Pair<Boolean, Long> getOptimizedMaxInitialResultHolderCapacity(FilterContext filterContext) {
if (filterContext == null) {
return Pair.of(false, null);
}

Set<Predicate> predicateColumns = new HashSet<>();
filterContext.getPredicateColumns(predicateColumns);

// Map to store the size of the predicates
Map<ExpressionContext, Integer> predicateSizeMap = new HashMap<>();
// Collect IN and EQ predicates and store their sizes
for (Predicate predicate : predicateColumns) {
if (predicate.getType() == Predicate.Type.IN || predicate.getType() == Predicate.Type.EQ) {
ExpressionContext lhs = predicate.getLhs();
int size = (predicate.getType() == Predicate.Type.IN)
? ((InPredicate) predicate).getValues().size()
: 1;
predicateSizeMap.merge(lhs, size, Integer::sum);
}
}

if (predicateSizeMap.isEmpty()) {
return Pair.of(false, null);
}

long cardinalityProduct = 1;
for (ExpressionContext expression : _groupByExpressions) {
Integer predicateLength = predicateSizeMap.get(expression);
Integer columnCardinalityLength = _cardinalityMap.get(expression);
int cardinality = Math.min(predicateLength != null ? predicateLength : columnCardinalityLength, columnCardinalityLength);
if (cardinalityProduct > Long.MAX_VALUE / cardinality) {
return Pair.of(false, null);
}
cardinalityProduct *= cardinality;
}
return Pair.of(true, cardinalityProduct);
}

@Override
public int getGlobalGroupKeyUpperBound() {
return _globalGroupIdUpperBound;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.common.request.context.TimeSeriesContext;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
Expand Down Expand Up @@ -353,56 +351,9 @@ public void setMaxExecutionThreads(int maxExecutionThreads) {
}

public int getMaxInitialResultHolderCapacity() {
if (QueryOptionsUtils.optimizeMaxInitialResultHolderCapacityEnabled(_queryOptions)) {
return getOptimizedMaxInitialResultHolderCapacity();
}
return _maxInitialResultHolderCapacity;
}

// TODO: Improve this to use segment level info to optimize the capacity
// Optimization to right-size the initial result holder capacity for group-by queries if they exist in the filter
// If any one group-by expression is not in the filter, we return the _maxInitialResultHolderCapacity.
public int getOptimizedMaxInitialResultHolderCapacity() {
if (getFilter() == null) {
return _maxInitialResultHolderCapacity;
}

assert getGroupByExpressions() != null;

Set<Predicate> predicateColumns = new HashSet<>();
getFilter().getPredicateColumns(predicateColumns);

// Map to store the size of the predicates
Map<ExpressionContext, Integer> predicateSizeMap = new HashMap<>();

// Collect IN and EQ predicates and store their sizes
for (Predicate predicate : predicateColumns) {
if (predicate.getType() == Predicate.Type.IN || predicate.getType() == Predicate.Type.EQ) {
ExpressionContext lhs = predicate.getLhs();
int size = (predicate.getType() == Predicate.Type.IN)
? ((InPredicate) predicate).getValues().size()
: 1;
predicateSizeMap.merge(lhs, size, Integer::sum);
}
}

int crossProductCapacity = 1;
for (ExpressionContext expression : getGroupByExpressions()) {
Integer size = predicateSizeMap.get(expression);

if (size == null) {
// No matching predicate for a group-by expression, return the default capacity
return _maxInitialResultHolderCapacity;
}
crossProductCapacity *= size;
if (crossProductCapacity > _maxInitialResultHolderCapacity) {
return _maxInitialResultHolderCapacity;
}
}
return crossProductCapacity;
}


public void setMaxInitialResultHolderCapacity(int maxInitialResultHolderCapacity) {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.*;


public class DictionaryBasedGroupKeyGeneratorTest {
Expand Down Expand Up @@ -167,7 +166,7 @@ public void testArrayBasedSingleValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), UNIQUE_ROWS, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), UNIQUE_ROWS, _errorMessage);

Expand All @@ -187,7 +186,7 @@ public void testIntMapBasedSingleValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage);
Expand All @@ -208,7 +207,7 @@ public void testLongMapBasedSingleValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage);
Expand All @@ -229,7 +228,7 @@ public void testArrayMapBasedSingleValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage);
Expand Down Expand Up @@ -264,7 +263,7 @@ public void testArrayBasedMultiValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
int groupKeyUpperBound = dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound();
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), groupKeyUpperBound, _errorMessage);

Expand All @@ -285,7 +284,7 @@ public void tesIntMapBasedMultiValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage);
Expand All @@ -308,7 +307,7 @@ public void testLongMapBasedMultiValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage);
Expand All @@ -330,7 +329,7 @@ public void testArrayMapBasedMultiValue() {
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage);
Expand All @@ -350,7 +349,7 @@ public void testNumGroupsLimit() {
// NOTE: arrayBasedThreshold must be smaller or equal to numGroupsLimit
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, getExpressions(groupByColumns), numGroupsLimit,
numGroupsLimit);
numGroupsLimit, null, null);
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), numGroupsLimit, _errorMessage);
assertEquals(dictionaryBasedGroupKeyGenerator.getCurrentGroupKeyUpperBound(), 0, _errorMessage);

Expand Down Expand Up @@ -431,6 +430,51 @@ public void testMapDefaultValue() {
GroupKeyGenerator.INVALID_ID);
}

@Test(dataProvider = "groupByResultHolderCapacityDataProvider")
public void testGetGroupByResultHolderCapacity(String query, Integer expectedCapacity) {
query = query + "SET optimizeMaxInitialResultHolderCapacity=true";
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
List<ExpressionContext> expressionContextList = queryContext.getGroupByExpressions();
ExpressionContext[] expressions =
expressionContextList.toArray(new ExpressionContext[expressionContextList.size()]);
DictionaryBasedGroupKeyGenerator dictionaryBasedGroupKeyGenerator =
new DictionaryBasedGroupKeyGenerator(_projectOperator, expressions,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT,
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, queryContext.getFilter(),
queryContext.getQueryOptions());
assertEquals(dictionaryBasedGroupKeyGenerator.getGlobalGroupKeyUpperBound(), expectedCapacity, _errorMessage);
}

@DataProvider(name = "groupByResultHolderCapacityDataProvider")
public Object[][] groupByResultHolderCapacityDataProvider() {
return new Object[][]{
// Single IN predicate
{"SELECT COUNT(*) FROM testTable WHERE s1 IN (1, 2, 3, 4, 5) GROUP BY s1 LIMIT 10;", 5},
// Multiple IN predicates but only one used in group-by
{"SELECT COUNT(*) FROM testTable WHERE s1 IN (1, 2, 3) AND s2 IN (4, 5) GROUP BY s1 LIMIT 10;", 3},
// Multiple IN predicates used in group-by
{"SELECT COUNT(*) FROM testTable WHERE s1 IN (1, 2, 3) AND s3 IN (4, 5) GROUP BY s1, s3 LIMIT 10;", 6},
// Single EQ predicate
{"SELECT COUNT(*) FROM testTable WHERE s1 = 1 GROUP BY s1 LIMIT 10;", 1},
// Multiple EQ predicates but only one used in group-by
{"SELECT COUNT(*) FROM testTable WHERE s1 = 1 AND s2 = 4 GROUP BY s1 LIMIT 10;", 1},
// Mixed predicates
{"SELECT COUNT(*) FROM testTable WHERE s1 IN (1, 2, 3) AND s3 = 4 GROUP BY s1, s3 LIMIT 10;", 3},
{"SELECT COUNT(*) FROM testTable WHERE s1 = 1 AND s3 IN (4, 5) GROUP BY s1, s3 LIMIT 10;", 2},
// Multiple IN Predicate columns with same column name and different values
{"SELECT COUNT(*) FROM testTable WHERE s1 IN (1, 2, 3) AND s1 IN (4, 5) OR s2 IN (6, 7) GROUP BY s1, s2"
+ " LIMIT 10;", 10},
// No filter -> s1 has cardinality 100
{"SELECT COUNT(*) FROM testTable GROUP BY s1 LIMIT 1000;", 100},
// No matching filter EQ predicate in group-by expression -> s2 has cardinality 100
{"SELECT COUNT(*) FROM testTable WHERE s1 = 1 GROUP BY s2 LIMIT 1000;", 100},
// No matching filter IN predicate in group-by expression -> s2 has cardinality 100
{"SELECT COUNT(*) FROM testTable WHERE s1 IN (1, 2, 3) GROUP BY s2 LIMIT 1000;", 100},
// Only one matching filter predicate in group-by expression -> (3 [s1] * 100 [s2]) = 300
{"SELECT COUNT(*) FROM testTable WHERE s1 IN (1, 2, 3) GROUP BY s1, s2 LIMIT 1000;", 300},
};
}

@AfterClass
public void tearDown() {
FileUtils.deleteQuietly(new File(INDEX_DIR_PATH));
Expand Down
Loading

0 comments on commit 5191b1e

Please sign in to comment.