Skip to content

Commit

Permalink
Bounded & unbounded solution
Browse files Browse the repository at this point in the history
  • Loading branch information
praveenc7 committed Oct 13, 2024
1 parent 8ba3594 commit 853f33d
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,6 @@ public void getColumns(Set<String> columns) {
}
}

public void getPredicateColumns(Set<Predicate> predicateColumns) {
if (_children != null) {
for (FilterContext child : _children) {
child.getPredicateColumns(predicateColumns);
}
}
else if (_predicate != null) {
predicateColumns.add(_predicate);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@
package org.apache.pinot.core.query.aggregation.groupby;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.TableResizer;
Expand Down Expand Up @@ -88,6 +95,11 @@ public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] a
// Initialize group key generator
int numGroupsLimit = queryContext.getNumGroupsLimit();
int maxInitialResultHolderCapacity = queryContext.getMaxInitialResultHolderCapacity();
Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates = null;
if (queryContext.getQueryOptions() != null
&& QueryOptionsUtils.optimizeMaxInitialResultHolderCapacityEnabled(queryContext.getQueryOptions())) {
groupByExpressionSizesFromPredicates = getGroupByExpressionSizesFromPredicates(queryContext);
}
if (groupKeyGenerator != null) {
_groupKeyGenerator = groupKeyGenerator;
} else {
Expand All @@ -96,15 +108,15 @@ public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] a
// TODO(nhejazi): support MV and dictionary based when null handling is enabled.
_groupKeyGenerator =
new NoDictionarySingleColumnGroupKeyGenerator(projectOperator, groupByExpressions[0], numGroupsLimit,
_nullHandlingEnabled);
_nullHandlingEnabled, groupByExpressionSizesFromPredicates);
} else {
_groupKeyGenerator =
new NoDictionaryMultiColumnGroupKeyGenerator(projectOperator, groupByExpressions, numGroupsLimit,
_nullHandlingEnabled);
_nullHandlingEnabled, groupByExpressionSizesFromPredicates);
}
} else {
_groupKeyGenerator = new DictionaryBasedGroupKeyGenerator(projectOperator, groupByExpressions, numGroupsLimit,
maxInitialResultHolderCapacity);
maxInitialResultHolderCapacity, groupByExpressionSizesFromPredicates);
}
}

Expand All @@ -127,6 +139,47 @@ public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] a
}
}

private Map<ExpressionContext, Integer> getGroupByExpressionSizesFromPredicates(QueryContext queryContext) {
FilterContext filterContext = queryContext.getFilter();
if (filterContext == null || queryContext.getGroupByExpressions() == null) {
return null;
}

Set<Predicate> predicateColumns = new HashSet<>();
if (filterContext.getType() == FilterContext.Type.AND) {
for (FilterContext child : filterContext.getChildren()) {
FilterContext.Type type = child.getType();
if (type != FilterContext.Type.PREDICATE && type != FilterContext.Type.AND) {
return null;
} else if (child.getPredicate() != null) {
predicateColumns.add(child.getPredicate());
}
}
} else if (filterContext.getPredicate() != null) {
predicateColumns.add(filterContext.getPredicate());
} else {
return null;
}

// Collect IN and EQ predicates and store their sizes
Map<ExpressionContext, Integer> predicateSizeMap = predicateColumns.stream()
.filter(predicate -> predicate.getType() == Predicate.Type.IN || predicate.getType() == Predicate.Type.EQ)
.collect(Collectors.toMap(
Predicate::getLhs,
predicate -> (predicate.getType() == Predicate.Type.IN)
? ((InPredicate) predicate).getValues().size()
: 1,
Integer::sum
));

// Populate the group-by expressions with sizes from the predicate map
return queryContext.getGroupByExpressions().stream()
.collect(Collectors.toMap(
expression -> expression,
expression -> predicateSizeMap.getOrDefault(expression, null)
));
}

@Override
public void process(ValueBlock valueBlock) {
// Generate group keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
Expand Down Expand Up @@ -99,7 +103,8 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
private final RawKeyHolder _rawKeyHolder;

public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold) {
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
assert numGroupsLimit >= arrayBasedThreshold;

_groupByExpressions = groupByExpressions;
Expand All @@ -113,7 +118,7 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
// no need to intern dictionary values when there is only one group by expression because
// only one call will be made to the dictionary to extract each raw value.
_internedDictionaryValues = _numGroupByExpressions > 1 ? new Object[_numGroupByExpressions][] : null;

Map<ExpressionContext, Integer> cardinalityMap = new HashMap<>(_numGroupByExpressions);
long cardinalityProduct = 1L;
boolean longOverflow = false;
for (int i = 0; i < _numGroupByExpressions; i++) {
Expand All @@ -123,6 +128,7 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
assert _dictionaries[i] != null;
int cardinality = _dictionaries[i].length();
_cardinalities[i] = cardinality;
cardinalityMap.put(groupByExpression, cardinality);
if (_internedDictionaryValues != null && cardinality < MAX_DICTIONARY_INTERN_TABLE_SIZE) {
_internedDictionaryValues[i] = new Object[cardinality];
}
Expand All @@ -135,6 +141,14 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
}
_isSingleValueColumn[i] = columnContext.isSingleValue();
}
if (groupByExpressionSizesFromPredicates != null) {
Pair<Boolean, Long> optimizedCardinality = getOptimizedGroupByCardinality(groupByExpressionSizesFromPredicates,
cardinalityMap);
if (optimizedCardinality.getLeft() && optimizedCardinality.getRight() != null) {
longOverflow = false;
cardinalityProduct = Math.min(optimizedCardinality.getRight(), cardinalityProduct);
}
}
// TODO: Clear the holder after processing the query instead of before
if (longOverflow) {
// ArrayMapBasedHolder
Expand Down Expand Up @@ -171,6 +185,22 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
}
}

private Pair<Boolean, Long> getOptimizedGroupByCardinality(Map<ExpressionContext, Integer> groupByExpressionSizes,
Map<ExpressionContext, Integer> columnCardinalityMap) {
long maxInitialResultHolderCapacity = 1L;
for (Map.Entry<ExpressionContext, Integer> entry : columnCardinalityMap.entrySet()) {
Integer cardinality = entry.getValue();
Integer size = groupByExpressionSizes.get(entry.getKey());
int minSize = size != null ? Math.min(size, cardinality) : cardinality;
if (maxInitialResultHolderCapacity > Long.MAX_VALUE / minSize) {
return Pair.of(false, null);
} else {
maxInitialResultHolderCapacity *= minSize;
}
}
return Pair.of(true, maxInitialResultHolderCapacity);
}

@Override
public int getGlobalGroupKeyUpperBound() {
return _globalGroupIdUpperBound;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
Expand Down Expand Up @@ -59,17 +60,20 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
private final boolean[] _isSingleValueExpressions;
private final int _numGroupsLimit;
private final boolean _nullHandlingEnabled;
private final int _globalGroupIdUpperBound;

public NoDictionaryMultiColumnGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, boolean nullHandlingEnabled) {
ExpressionContext[] groupByExpressions, int numGroupsLimit, boolean nullHandlingEnabled,
Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
_groupByExpressions = groupByExpressions;
_numGroupByExpressions = groupByExpressions.length;
_storedTypes = new DataType[_numGroupByExpressions];
_dictionaries = new Dictionary[_numGroupByExpressions];
_onTheFlyDictionaries = new ValueToIdMap[_numGroupByExpressions];
_isSingleValueExpressions = new boolean[_numGroupByExpressions];
_nullHandlingEnabled = nullHandlingEnabled;

int optimizedGroupByUpperBound = 1;
boolean canOptimizeGroupByUpperBound = groupByExpressionSizesFromPredicates != null;
for (int i = 0; i < _numGroupByExpressions; i++) {
ExpressionContext groupByExpression = groupByExpressions[i];
ColumnContext columnContext = projectOperator.getResultColumnContext(groupByExpression);
Expand All @@ -80,17 +84,30 @@ public NoDictionaryMultiColumnGroupKeyGenerator(BaseProjectOperator<?> projectOp
} else {
_onTheFlyDictionaries[i] = ValueToIdMapFactory.get(_storedTypes[i]);
}
if (canOptimizeGroupByUpperBound) {
Integer size = groupByExpressionSizesFromPredicates.get(groupByExpression);
if (size == null) {
canOptimizeGroupByUpperBound = false;
} else {
if (optimizedGroupByUpperBound > Integer.MAX_VALUE / size) {
optimizedGroupByUpperBound = numGroupsLimit;
} else {
optimizedGroupByUpperBound *= size;
}
}
}
_isSingleValueExpressions[i] = columnContext.isSingleValue();
}

_groupKeyMap = new Object2IntOpenHashMap<>();
_groupKeyMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
_globalGroupIdUpperBound = canOptimizeGroupByUpperBound ? optimizedGroupByUpperBound : numGroupsLimit;
}

@Override
public int getGlobalGroupKeyUpperBound() {
return _numGroupsLimit;
return _globalGroupIdUpperBound;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
Expand Down Expand Up @@ -64,12 +65,18 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
private int _numGroups = 0;

public NoDictionarySingleColumnGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext groupByExpression, int numGroupsLimit, boolean nullHandlingEnabled) {
ExpressionContext groupByExpression, int numGroupsLimit, boolean nullHandlingEnabled,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
_groupByExpression = groupByExpression;
ColumnContext columnContext = projectOperator.getResultColumnContext(groupByExpression);
_storedType = columnContext.getDataType().getStoredType();
_groupKeyMap = createGroupKeyMap(_storedType);
_globalGroupIdUpperBound = numGroupsLimit;
if (groupByExpressionSizesFromPredicates != null) {
Integer size = groupByExpressionSizesFromPredicates.get(groupByExpression);
_globalGroupIdUpperBound = size != null ? Math.min(size, numGroupsLimit) : numGroupsLimit;
} else {
_globalGroupIdUpperBound = numGroupsLimit;
}
_nullHandlingEnabled = nullHandlingEnabled;
_isSingleValueExpression = columnContext.isSingleValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.common.request.context.TimeSeriesContext;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
Expand Down Expand Up @@ -353,56 +351,9 @@ public void setMaxExecutionThreads(int maxExecutionThreads) {
}

public int getMaxInitialResultHolderCapacity() {
if (QueryOptionsUtils.optimizeMaxInitialResultHolderCapacityEnabled(_queryOptions)) {
return getOptimizedMaxInitialResultHolderCapacity();
}
return _maxInitialResultHolderCapacity;
}

// TODO: Improve this to use segment level info to optimize the capacity
// Optimization to right-size the initial result holder capacity for group-by queries if they exist in the filter.
// If any one group-by expression is not in the filter, we return the _maxInitialResultHolderCapacity.
public int getOptimizedMaxInitialResultHolderCapacity() {
if (getFilter() == null) {
return _maxInitialResultHolderCapacity;
}

assert getGroupByExpressions() != null;

Set<Predicate> predicateColumns = new HashSet<>();
getFilter().getPredicateColumns(predicateColumns);

// Map to store the size of the predicates
Map<ExpressionContext, Integer> predicateSizeMap = new HashMap<>();

// Collect IN and EQ predicates and store their sizes
for (Predicate predicate : predicateColumns) {
if (predicate.getType() == Predicate.Type.IN || predicate.getType() == Predicate.Type.EQ) {
ExpressionContext lhs = predicate.getLhs();
int size = (predicate.getType() == Predicate.Type.IN)
? ((InPredicate) predicate).getValues().size()
: 1;
predicateSizeMap.merge(lhs, size, Integer::sum);
}
}

int crossProductCapacity = 1;
for (ExpressionContext expression : getGroupByExpressions()) {
Integer size = predicateSizeMap.get(expression);

if (size == null) {
// No matching predicate for a group-by expression, return the default capacity
return _maxInitialResultHolderCapacity;
}
crossProductCapacity *= size;
if (crossProductCapacity > _maxInitialResultHolderCapacity) {
return _maxInitialResultHolderCapacity;
}
}
return crossProductCapacity;
}


public void setMaxInitialResultHolderCapacity(int maxInitialResultHolderCapacity) {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
}
Expand Down
Loading

0 comments on commit 853f33d

Please sign in to comment.