Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic evaluation of GroupBy Initial Capacity #14001

Merged
merged 16 commits into from
Oct 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ public static Integer getMaxInitialResultHolderCapacity(Map<String, String> quer
return checkedParseInt(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitResultCap);
}

public static boolean optimizeMaxInitialResultHolderCapacityEnabled(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}

@Nullable
public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
String groupByTrimThreshold = queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@
package org.apache.pinot.core.query.aggregation.groupby;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.TableResizer;
Expand Down Expand Up @@ -88,6 +95,11 @@ public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] a
// Initialize group key generator
int numGroupsLimit = queryContext.getNumGroupsLimit();
int maxInitialResultHolderCapacity = queryContext.getMaxInitialResultHolderCapacity();
Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates = null;
if (queryContext.getQueryOptions() != null
&& QueryOptionsUtils.optimizeMaxInitialResultHolderCapacityEnabled(queryContext.getQueryOptions())) {
groupByExpressionSizesFromPredicates = getGroupByExpressionSizesFromPredicates(queryContext);
}
if (groupKeyGenerator != null) {
_groupKeyGenerator = groupKeyGenerator;
} else {
Expand All @@ -96,15 +108,15 @@ public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] a
// TODO(nhejazi): support MV and dictionary based when null handling is enabled.
_groupKeyGenerator =
new NoDictionarySingleColumnGroupKeyGenerator(projectOperator, groupByExpressions[0], numGroupsLimit,
_nullHandlingEnabled);
_nullHandlingEnabled, groupByExpressionSizesFromPredicates);
} else {
_groupKeyGenerator =
new NoDictionaryMultiColumnGroupKeyGenerator(projectOperator, groupByExpressions, numGroupsLimit,
_nullHandlingEnabled);
_nullHandlingEnabled, groupByExpressionSizesFromPredicates);
}
} else {
_groupKeyGenerator = new DictionaryBasedGroupKeyGenerator(projectOperator, groupByExpressions, numGroupsLimit,
maxInitialResultHolderCapacity);
maxInitialResultHolderCapacity, groupByExpressionSizesFromPredicates);
}
}

Expand All @@ -127,6 +139,54 @@ public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] a
}
}

/**
* Retrieve the sizes of GroupBy expressions from IN an EQ predicates found in the filter context, if available.
* 1. If the filter context is null or lacks GroupBy expressions, return null.
* 2. Ensure the top-level filter context consists solely of AND-type filters; other types for example OR we cannot
* guarantee deterministic sizes for GroupBy expressions.
*/
private Map<ExpressionContext, Integer> getGroupByExpressionSizesFromPredicates(QueryContext queryContext) {
FilterContext filterContext = queryContext.getFilter();
if (filterContext == null || queryContext.getGroupByExpressions() == null) {
return null;
}

Set<Predicate> predicateColumns = new HashSet<>();
if (filterContext.getType() == FilterContext.Type.AND) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments to this part of the code to explain the logic to handle AND/PREDICATE/OR? Similarly those parts where cardinalities are processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

for (FilterContext child : filterContext.getChildren()) {
FilterContext.Type type = child.getType();
if (type != FilterContext.Type.PREDICATE && type != FilterContext.Type.AND) {
return null;
} else if (child.getPredicate() != null) {
predicateColumns.add(child.getPredicate());
}
}
} else if (filterContext.getPredicate() != null) {
predicateColumns.add(filterContext.getPredicate());
} else {
return null;
}

// Collect IN and EQ predicates and store their sizes
Map<ExpressionContext, Integer> predicateSizeMap = predicateColumns.stream()
.filter(predicate -> predicate.getType() == Predicate.Type.IN || predicate.getType() == Predicate.Type.EQ)
.collect(Collectors.toMap(
Predicate::getLhs,
predicate -> (predicate.getType() == Predicate.Type.IN)
? ((InPredicate) predicate).getValues().size()
: 1,
Integer::min
));

// Populate the group-by expressions with sizes from the predicate map
return queryContext.getGroupByExpressions().stream()
.filter(predicateSizeMap::containsKey)
.collect(Collectors.toMap(
expression -> expression,
expression -> predicateSizeMap.getOrDefault(expression, null)
));
}

@Override
public void process(ValueBlock valueBlock) {
// Generate group keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
Expand Down Expand Up @@ -99,7 +103,8 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
private final RawKeyHolder _rawKeyHolder;

public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold) {
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
assert numGroupsLimit >= arrayBasedThreshold;

_groupByExpressions = groupByExpressions;
Expand All @@ -113,7 +118,7 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
// no need to intern dictionary values when there is only one group by expression because
// only one call will be made to the dictionary to extract each raw value.
_internedDictionaryValues = _numGroupByExpressions > 1 ? new Object[_numGroupByExpressions][] : null;

Map<ExpressionContext, Integer> cardinalityMap = new HashMap<>(_numGroupByExpressions);
long cardinalityProduct = 1L;
boolean longOverflow = false;
for (int i = 0; i < _numGroupByExpressions; i++) {
Expand All @@ -123,6 +128,7 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
assert _dictionaries[i] != null;
int cardinality = _dictionaries[i].length();
_cardinalities[i] = cardinality;
cardinalityMap.put(groupByExpression, cardinality);
if (_internedDictionaryValues != null && cardinality < MAX_DICTIONARY_INTERN_TABLE_SIZE) {
_internedDictionaryValues[i] = new Object[cardinality];
}
Expand All @@ -135,6 +141,14 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
}
_isSingleValueColumn[i] = columnContext.isSingleValue();
}
if (groupByExpressionSizesFromPredicates != null) {
Pair<Boolean, Long> optimizedCardinality = getOptimizedGroupByCardinality(groupByExpressionSizesFromPredicates,
cardinalityMap);
if (optimizedCardinality.getLeft() && optimizedCardinality.getRight() != null) {
longOverflow = false;
cardinalityProduct = Math.min(optimizedCardinality.getRight(), cardinalityProduct);
}
}
// TODO: Clear the holder after processing the query instead of before
if (longOverflow) {
// ArrayMapBasedHolder
Expand Down Expand Up @@ -171,6 +185,22 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
}
}

private Pair<Boolean, Long> getOptimizedGroupByCardinality(Map<ExpressionContext, Integer> groupByExpressionSizes,
Map<ExpressionContext, Integer> columnCardinalityMap) {
long maxInitialResultHolderCapacity = 1L;
for (Map.Entry<ExpressionContext, Integer> entry : columnCardinalityMap.entrySet()) {
Integer cardinality = entry.getValue();
Integer size = groupByExpressionSizes.get(entry.getKey());
int minSize = size != null ? Math.min(size, cardinality) : cardinality;
if (maxInitialResultHolderCapacity > Long.MAX_VALUE / minSize) {
return Pair.of(false, null);
} else {
maxInitialResultHolderCapacity *= minSize;
}
}
return Pair.of(true, maxInitialResultHolderCapacity);
}

@Override
public int getGlobalGroupKeyUpperBound() {
return _globalGroupIdUpperBound;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
Expand Down Expand Up @@ -59,17 +60,20 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
private final boolean[] _isSingleValueExpressions;
private final int _numGroupsLimit;
private final boolean _nullHandlingEnabled;
private final int _globalGroupIdUpperBound;

public NoDictionaryMultiColumnGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, boolean nullHandlingEnabled) {
ExpressionContext[] groupByExpressions, int numGroupsLimit, boolean nullHandlingEnabled,
Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
_groupByExpressions = groupByExpressions;
_numGroupByExpressions = groupByExpressions.length;
_storedTypes = new DataType[_numGroupByExpressions];
_dictionaries = new Dictionary[_numGroupByExpressions];
_onTheFlyDictionaries = new ValueToIdMap[_numGroupByExpressions];
_isSingleValueExpressions = new boolean[_numGroupByExpressions];
_nullHandlingEnabled = nullHandlingEnabled;

int optimizedGroupByUpperBound = 1;
boolean canOptimizeGroupByUpperBound = groupByExpressionSizesFromPredicates != null;
for (int i = 0; i < _numGroupByExpressions; i++) {
ExpressionContext groupByExpression = groupByExpressions[i];
ColumnContext columnContext = projectOperator.getResultColumnContext(groupByExpression);
Expand All @@ -80,17 +84,30 @@ public NoDictionaryMultiColumnGroupKeyGenerator(BaseProjectOperator<?> projectOp
} else {
_onTheFlyDictionaries[i] = ValueToIdMapFactory.get(_storedTypes[i]);
}
if (canOptimizeGroupByUpperBound) {
Integer size = groupByExpressionSizesFromPredicates.get(groupByExpression);
if (size == null) {
canOptimizeGroupByUpperBound = false;
} else {
if (optimizedGroupByUpperBound > Integer.MAX_VALUE / size) {
optimizedGroupByUpperBound = numGroupsLimit;
} else {
optimizedGroupByUpperBound *= size;
}
}
}
_isSingleValueExpressions[i] = columnContext.isSingleValue();
}

_groupKeyMap = new Object2IntOpenHashMap<>();
_groupKeyMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
_globalGroupIdUpperBound = canOptimizeGroupByUpperBound ? optimizedGroupByUpperBound : numGroupsLimit;
}

@Override
public int getGlobalGroupKeyUpperBound() {
return _numGroupsLimit;
return _globalGroupIdUpperBound;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
Expand Down Expand Up @@ -64,12 +65,18 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
private int _numGroups = 0;

public NoDictionarySingleColumnGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext groupByExpression, int numGroupsLimit, boolean nullHandlingEnabled) {
ExpressionContext groupByExpression, int numGroupsLimit, boolean nullHandlingEnabled,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
_groupByExpression = groupByExpression;
ColumnContext columnContext = projectOperator.getResultColumnContext(groupByExpression);
_storedType = columnContext.getDataType().getStoredType();
_groupKeyMap = createGroupKeyMap(_storedType);
_globalGroupIdUpperBound = numGroupsLimit;
if (groupByExpressionSizesFromPredicates != null) {
Integer size = groupByExpressionSizesFromPredicates.get(groupByExpression);
_globalGroupIdUpperBound = size != null ? Math.min(size, numGroupsLimit) : numGroupsLimit;
} else {
_globalGroupIdUpperBound = numGroupsLimit;
}
_nullHandlingEnabled = nullHandlingEnabled;
_isSingleValueExpression = columnContext.isSingleValue();
}
Expand Down
Loading
Loading