Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add group by trimming to MSQE/V2 query engine #14727

Merged
merged 10 commits into from
Jan 14, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString);
}

@Nullable
public static Integer getGroupTrimSize(Map<String, String> queryOptions) {
String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE);
// NOTE: Non-positive value means turning off the intermediate level trim
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize);
}

@Nullable
public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) {
String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
Expand Down Expand Up @@ -268,6 +275,10 @@ public static Integer getMultiStageLeafLimit(Map<String, String> queryOptions) {
return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr);
}

public static boolean getErrorOnNumGroupsLimit(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ERROR_ON_NUM_GROUPS_LIMIT));
}

@Nullable
public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -244,6 +246,42 @@ public List<String> listSegments(String tableName, @Nullable String tableType, b
}
}

public Map<String, List<String>> getServersToSegmentsMap(String tableName, TableType tableType)
gortiz marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
String url = _controllerRequestURLBuilder.forServersToSegmentsMap(tableName, tableType.toString());
try {
SimpleHttpResponse resp =
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URI(url), _headers));
JsonNode jsonNode = JsonUtils.stringToJsonNode(resp.getResponse());
if (jsonNode == null || jsonNode.get(0) == null) {
return Collections.emptyMap();
}

JsonNode serversMap = jsonNode.get(0).get("serverToSegmentsMap");
if (serversMap == null) {
return Collections.emptyMap();
}

HashMap<String, List<String>> result = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = serversMap.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
List<String> segments = new ArrayList<>();

ArrayNode value = (ArrayNode) field.getValue();
for (int i = 0, len = value.size(); i < len; i++) {
segments.add(value.get(i).toString());
}

result.put(field.getKey(), segments);
}

return result;
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
}

public void deleteSegment(String tableName, String segmentName)
throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,11 @@ public long getTableSize(String tableName)
return getControllerRequestClient().getTableSize(tableName);
}

public Map<String, List<String>> getTableServersToSegmentsMap(String tableName, TableType tableType)
throws IOException {
return getControllerRequestClient().getServersToSegmentsMap(tableName, tableType);
}

public String reloadOfflineTable(String tableName)
throws IOException {
return reloadOfflineTable(tableName, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


/**
* The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment.
* The <code>AggregationOperator</code> class implements keyless aggregation query on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class AggregationOperator extends BaseOperator<AggregationResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@


/**
* The <code>GroupByOperator</code> class provides the operator for group-by query on a single segment.
* The <code>GroupByOperator</code> class implements keyed aggregation on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class GroupByOperator extends BaseOperator<GroupByResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@


/**
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments.
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments in
* V1/SSQE.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class CombinePlanNode implements PlanNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;

// By default, group trimming in AggregateOperator is disabled
public static final int DEFAULT_GROUP_TRIM_SIZE = -1;

// Instance config key for minimum segment-level group trim size
// Set as pinot.server.query.executor.min.segment.group.trim.size
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = "min.segment.group.trim.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* integer raw keys and map them onto contiguous group ids. (INT_MAP_BASED)
* </li>
* <li>
* If the maximum number of possible group keys cannot fit into than integer, but still fit into long, generate long
* If the maximum number of possible group keys cannot fit into integer, but still fit into long, generate long
* raw keys and map them onto contiguous group ids. (LONG_MAP_BASED)
* </li>
* <li>
Expand Down Expand Up @@ -105,8 +105,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
assert numGroupsLimit >= arrayBasedThreshold;

_groupByExpressions = groupByExpressions;
_numGroupByExpressions = groupByExpressions.length;

Expand Down Expand Up @@ -173,7 +171,9 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
_rawKeyHolder = new LongMapBasedHolder(groupIdMap);
} else {
_globalGroupIdUpperBound = Math.min((int) cardinalityProduct, numGroupsLimit);
if (cardinalityProduct > arrayBasedThreshold) {
// arrayBaseHolder fails with ArrayIndexOutOfBoundsException if numGroupsLimit < cardinalityProduct
// because array doesn't fit all (potentially unsorted) values
if (cardinalityProduct > arrayBasedThreshold || numGroupsLimit < cardinalityProduct) {
// IntMapBasedHolder
IntGroupIdMap groupIdMap = THREAD_LOCAL_INT_MAP.get();
groupIdMap.clearAndTrim();
Expand Down Expand Up @@ -281,6 +281,7 @@ private interface RawKeyHolder {
int getNumKeys();
}

// This holder works only if it can fit all results, otherwise it fails on AIOOBE or produces too many group keys
private class ArrayBasedHolder implements RawKeyHolder {
private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
private int _numKeys = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public BrokerReduceService(PinotConfiguration config) {
}

public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap, long reduceTimeOutMs, BrokerMetrics brokerMetrics) {
Map<ServerRoutingInstance, DataTable> dataTableMap,
long reduceTimeOutMs, BrokerMetrics brokerMetrics) {
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved
if (dataTableMap.isEmpty()) {
// Empty response.
return BrokerResponseNative.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

/**
* Helper class to reduce data tables and set group by results into the BrokerResponseNative
* Used for key-less aggregations, e.g. select max(id), sum(quantity) from orders .
*/
@SuppressWarnings("rawtypes")
public class GroupByDataTableReducer implements DataTableReducer {
Expand Down Expand Up @@ -140,9 +141,12 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema,
* @param brokerMetrics broker metrics (meters)
* @throws TimeoutException If unable complete within timeout.
*/
private void reduceResult(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
BrokerMetrics brokerMetrics)
private void reduceResult(BrokerResponseNative brokerResponseNative,
DataSchema dataSchema,
Collection<DataTable> dataTables,
DataTableReducerContext reducerContext,
String rawTableName,
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved
BrokerMetrics brokerMetrics)
throws TimeoutException {
// NOTE: This step will modify the data schema and also return final aggregate results.
IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public FilterContext getFilter() {
}

/**
* Returns a list of expressions in the GROUP-BY clause, or {@code null} if there is no GROUP-BY clause.
* Returns a list of expressions in the GROUP-BY clause (aggregation keys), or {@code null} if there is no GROUP-BY
* clause.
*/
@Nullable
public List<ExpressionContext> getGroupByExpressions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,22 @@ public static QueryContext getQueryContext(PinotQuery pinotQuery) {
explainMode = ExplainMode.DESCRIPTION;
}

return new QueryContext.Builder().setTableName(tableName).setSubquery(subquery)
.setSelectExpressions(selectExpressions).setDistinct(distinct).setAliasList(aliasList).setFilter(filter)
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions()).setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode).build();
return new QueryContext.Builder()
.setTableName(tableName)
.setSubquery(subquery)
.setSelectExpressions(selectExpressions)
.setDistinct(distinct)
.setAliasList(aliasList)
.setFilter(filter)
.setGroupByExpressions(groupByExpressions)
.setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter)
.setLimit(pinotQuery.getLimit())
.setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions())
.setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode)
.build();
}

private static boolean isMultiStage(PinotQuery pinotQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public static IndexedTable createIndexedTableForCombineOperator(GroupByResultsBl
int limit = queryContext.getLimit();
boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
boolean hasHaving = queryContext.getHavingFilter() != null;
int minTrimSize = queryContext.getMinServerGroupTrimSize();
int minTrimSize =
queryContext.getMinServerGroupTrimSize(); // it's minBrokerGroupTrimSize in broker
int minInitialIndexedTableCapacity = queryContext.getMinInitialIndexedTableCapacity();

// Disable trim when min trim size is non-positive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
package org.apache.pinot.core.query.aggregation.function;

import org.apache.pinot.queries.FluentQueryTest;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec.PASS_THROUGH;


public class AvgAggregationFunctionTest extends AbstractAggregationFunctionTest {

Expand Down Expand Up @@ -177,4 +182,74 @@ void aggregationGroupByMV(DataTypeScenario scenario) {
"tag3 | null"
);
}

@Test(dataProvider = "encodingTypes")
void singleKeyAggregationWithSmallNumGroupsLimitDoesntThrowAIOOBE(FieldConfig.EncodingType encoding) {
FluentQueryTest.withBaseDir(_baseDir)
.givenTable(
new Schema.SchemaBuilder()
.setSchemaName("testTable")
.setEnableColumnBasedNullHandling(true)
.addMetricField("key", FieldSpec.DataType.INT)
.addMetricField("value", FieldSpec.DataType.INT)
.build(),
new TableConfigBuilder(TableType.OFFLINE)
.setTableName("testTable")
.addFieldConfig(
new FieldConfig("key", encoding, (FieldConfig.IndexType) null, PASS_THROUGH, null))
.build())
.onFirstInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.andOnSecondInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.whenQuery(
"set numGroupsLimit=3; set maxInitialResultHolderCapacity=1000; "
+ "select key, avg(value) "
+ "from testTable "
+ "group by key "
+ "order by key")
.thenResultIs(
"INTEGER | DOUBLE",
"5 | 3",
"6 | 2",
"7 | 1"
);
}

@Test(dataProvider = "encodingTypes")
void multiKeyAggregationWithSmallNumGroupsLimitDoesntThrowAIOOBE(FieldConfig.EncodingType encoding) {
FluentQueryTest.withBaseDir(_baseDir)
.givenTable(
new Schema.SchemaBuilder()
.setSchemaName("testTable")
.setEnableColumnBasedNullHandling(true)
.addMetricField("key1", FieldSpec.DataType.INT)
.addMetricField("key2", FieldSpec.DataType.INT)
.addMetricField("value", FieldSpec.DataType.INT)
.build(),
new TableConfigBuilder(TableType.OFFLINE)
.setTableName("testTable")
.addFieldConfig(
new FieldConfig("key1", encoding, (FieldConfig.IndexType) null, PASS_THROUGH, null))
.addFieldConfig(
new FieldConfig("key2", encoding, (FieldConfig.IndexType) null, PASS_THROUGH, null))
.build())
.onFirstInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.andOnSecondInstance(new Object[]{7, 1}, new Object[]{6, 2}, new Object[]{5, 3}, new Object[]{4, 4})
.whenQuery(
"set numGroupsLimit=3; set maxInitialResultHolderCapacity=1000; "
+ "select key1, key2, count(*) "
+ "from testTable "
+ "group by key1, key2 "
+ "order by key1, key2")
.thenResultIs(
"INTEGER | INTEGER | LONG",
"5 | 3 | 2",
"6 | 2 | 2",
"7 | 1 | 2"
);
}

@DataProvider(name = "encodingTypes")
FieldConfig.EncodingType[] encodingTypes() {
return FieldConfig.EncodingType.values();
}
}
Loading
Loading