Skip to content

Commit

Permalink
Add group_trim_size and error_on_num_groups_limit hints/options.
Browse files Browse the repository at this point in the history
  • Loading branch information
bziobrowski committed Dec 30, 2024
1 parent 5bd1606 commit b603ad5
Show file tree
Hide file tree
Showing 24 changed files with 1,110 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ public static Integer getMultiStageLeafLimit(Map<String, String> queryOptions) {
return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr);
}

public static boolean getErrorOnNumGroupsLimit(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ERROR_ON_NUM_GROUPS_LIMIT));
}

@Nullable
public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -244,6 +246,42 @@ public List<String> listSegments(String tableName, @Nullable String tableType, b
}
}

public Map<String, List<String>> getServersToSegmentsMap(String tableName, TableType tableType)
throws IOException {
String url = _controllerRequestURLBuilder.forServersToSegmentsMap(tableName, tableType.toString());
try {
SimpleHttpResponse resp =
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URI(url), _headers));
JsonNode jsonNode = JsonUtils.stringToJsonNode(resp.getResponse());
if (jsonNode == null || jsonNode.get(0) == null) {
return Collections.emptyMap();
}

JsonNode serversMap = jsonNode.get(0).get("serverToSegmentsMap");
if (serversMap == null) {
return Collections.emptyMap();
}

HashMap<String, List<String>> result = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = serversMap.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
List<String> segments = new ArrayList<>();

ArrayNode value = (ArrayNode) field.getValue();
for (int i = 0, len = value.size(); i < len; i++) {
segments.add(value.get(i).toString());
}

result.put(field.getKey(), segments);
}

return result;
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
}

public void deleteSegment(String tableName, String segmentName)
throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,11 @@ public long getTableSize(String tableName)
return getControllerRequestClient().getTableSize(tableName);
}

public Map<String, List<String>> getTableServersToSegmentsMap(String tableName, TableType tableType)
throws IOException {
return getControllerRequestClient().getServersToSegmentsMap(tableName, tableType);
}

public String reloadOfflineTable(String tableName)
throws IOException {
return reloadOfflineTable(tableName, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


/**
* The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment.
* The <code>AggregationOperator</code> class implements keyless aggregation query on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class AggregationOperator extends BaseOperator<AggregationResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@


/**
* The <code>GroupByOperator</code> class provides the operator for group-by query on a single segment.
* The <code>GroupByOperator</code> class implements keyed aggregation on a single segment in V1/SSQE.
*/
@SuppressWarnings("rawtypes")
public class GroupByOperator extends BaseOperator<GroupByResultsBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@


/**
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments.
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments in
* V1/SSQE.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class CombinePlanNode implements PlanNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* integer raw keys and map them onto contiguous group ids. (INT_MAP_BASED)
* </li>
* <li>
* If the maximum number of possible group keys cannot fit into than integer, but still fit into long, generate long
* If the maximum number of possible group keys cannot fit into integer, but still fit into long, generate long
* raw keys and map them onto contiguous group ids. (LONG_MAP_BASED)
* </li>
* <li>
Expand Down Expand Up @@ -105,8 +105,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
ExpressionContext[] groupByExpressions, int numGroupsLimit, int arrayBasedThreshold,
@Nullable Map<ExpressionContext, Integer> groupByExpressionSizesFromPredicates) {
assert numGroupsLimit >= arrayBasedThreshold;

_groupByExpressions = groupByExpressions;
_numGroupByExpressions = groupByExpressions.length;

Expand Down Expand Up @@ -173,7 +171,9 @@ public DictionaryBasedGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
_rawKeyHolder = new LongMapBasedHolder(groupIdMap);
} else {
_globalGroupIdUpperBound = Math.min((int) cardinalityProduct, numGroupsLimit);
if (cardinalityProduct > arrayBasedThreshold) {
// arrayBaseHolder fails with ArrayIndexOutOfBoundsException if numGroupsLimit < cardinalityProduct
// because array doesn't fit all (potentially unsorted) values
if (cardinalityProduct > arrayBasedThreshold || numGroupsLimit < cardinalityProduct) {
// IntMapBasedHolder
IntGroupIdMap groupIdMap = THREAD_LOCAL_INT_MAP.get();
groupIdMap.clearAndTrim();
Expand Down Expand Up @@ -281,6 +281,7 @@ private interface RawKeyHolder {
int getNumKeys();
}

// This holder works only if it can fit all results, otherwise it fails on AIOOBE or produces too many group keys
private class ArrayBasedHolder implements RawKeyHolder {
private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
private int _numKeys = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ public BrokerReduceService(PinotConfiguration config) {
super(config);
}

public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap, long reduceTimeOutMs, BrokerMetrics brokerMetrics) {
public BrokerResponseNative reduceOnDataTable(
BrokerRequest brokerRequest,
BrokerRequest serverBrokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap,
long reduceTimeOutMs,
BrokerMetrics brokerMetrics) {
if (dataTableMap.isEmpty()) {
// Empty response.
return BrokerResponseNative.empty();
Expand All @@ -70,12 +74,12 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));

ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
BrokerResponseNative response = new BrokerResponseNative();

// Cache a data schema from data tables (try to cache one with data rows associated with it).
DataSchema dataSchemaFromEmptyDataTable = null;
DataSchema dataSchemaFromNonEmptyDataTable = null;
List<ServerRoutingInstance> serversWithConflictingDataSchema = new ArrayList<>();
DataSchema schemaOfEmptyTable = null;
DataSchema schemaOfNonEmptyTable = null;
List<ServerRoutingInstance> serversWithConflictingSchema = new ArrayList<>();

// Process server response metadata.
Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
Expand All @@ -93,19 +97,19 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
} else {
// Try to cache a data table with data rows inside, or cache one with data schema inside.
if (dataTable.getNumberOfRows() == 0) {
if (dataSchemaFromEmptyDataTable == null) {
dataSchemaFromEmptyDataTable = dataSchema;
if (schemaOfEmptyTable == null) {
schemaOfEmptyTable = dataSchema;
}
iterator.remove();
} else {
if (dataSchemaFromNonEmptyDataTable == null) {
dataSchemaFromNonEmptyDataTable = dataSchema;
if (schemaOfNonEmptyTable == null) {
schemaOfNonEmptyTable = dataSchema;
} else {
// Remove data tables with conflicting data schema.
// NOTE: Only compare the column data types, since the column names (string representation of expression)
// can change across different versions.
if (!Arrays.equals(dataSchema.getColumnDataTypes(), dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
serversWithConflictingDataSchema.add(entry.getKey());
if (!Arrays.equals(dataSchema.getColumnDataTypes(), schemaOfNonEmptyTable.getColumnDataTypes())) {
serversWithConflictingSchema.add(entry.getKey());
iterator.remove();
}
}
Expand All @@ -117,52 +121,51 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
String rawTableName = TableNameBuilder.extractRawTableName(tableName);

// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
aggregator.setStats(rawTableName, response, brokerMetrics);

// Report the servers with conflicting data schema.
if (!serversWithConflictingDataSchema.isEmpty()) {
if (!serversWithConflictingSchema.isEmpty()) {
String errorMessage = QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
+ " from servers: " + serversWithConflictingDataSchema + " got dropped due to data schema inconsistency.";
+ " from servers: " + serversWithConflictingSchema + " got dropped due to data schema inconsistency.";
LOGGER.warn(errorMessage);
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1);
brokerResponseNative.addException(
new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
response.addException(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
}

// NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
// response with metadata only.
DataSchema cachedDataSchema =
dataSchemaFromNonEmptyDataTable != null ? dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
schemaOfNonEmptyTable != null ? schemaOfNonEmptyTable : schemaOfEmptyTable;
if (cachedDataSchema == null) {
return brokerResponseNative;
return response;
}

QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext);
Integer minGroupTrimSizeOpt = null;
Integer groupTrimThresholdOpt = null;
Integer minInitialIndexedTableCapOpt = null;

Integer minGroupTrimSizeQueryOption = null;
Integer groupTrimThresholdQueryOption = null;
Integer minInitialIndexedTableCapacityQueryOption = null;
if (queryOptions != null) {
minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
minInitialIndexedTableCapacityQueryOption = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
minGroupTrimSizeOpt = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
groupTrimThresholdOpt = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
minInitialIndexedTableCapOpt = QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
}
int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
int groupTrimThreshold =
groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;

int minGroupTrimSize = minGroupTrimSizeOpt != null ? minGroupTrimSizeOpt : _minGroupTrimSize;
int groupTrimThreshold = groupTrimThresholdOpt != null ? groupTrimThresholdOpt : _groupByTrimThreshold;
int minInitialIndexedTableCapacity =
minInitialIndexedTableCapacityQueryOption != null ? minInitialIndexedTableCapacityQueryOption
: _minInitialIndexedTableCapacity;
minInitialIndexedTableCapOpt != null ? minInitialIndexedTableCapOpt : _minInitialIndexedTableCapacity;

QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
try {
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
DataTableReducer reducer = ResultReducerFactory.getResultReducer(serverQueryContext);
reducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, response,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity), brokerMetrics);
} catch (EarlyTerminationException e) {
brokerResponseNative.addException(
response.addException(
new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
}

QueryContext queryContext;
if (brokerRequest == serverBrokerRequest) {
queryContext = serverQueryContext;
Expand All @@ -173,13 +176,13 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke
throw new BadQueryRequestException("Nested query is not supported without gapfill");
}
BaseGapfillProcessor gapfillProcessor = GapfillProcessorFactory.getGapfillProcessor(queryContext, gapfillType);
gapfillProcessor.process(brokerResponseNative);
gapfillProcessor.process(response);
}

if (!serverQueryContext.isExplain()) {
updateAlias(queryContext, brokerResponseNative);
updateAlias(queryContext, response);
}
return brokerResponseNative;
return response;
}

public void shutDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

/**
* Helper class to reduce data tables and set group by results into the BrokerResponseNative
* Used for key-less aggregations, e.g. select max(id), sum(quantity) from orders .
*/
@SuppressWarnings("rawtypes")
public class GroupByDataTableReducer implements DataTableReducer {
Expand Down Expand Up @@ -140,9 +141,12 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema,
* @param brokerMetrics broker metrics (meters)
* @throws TimeoutException If unable complete within timeout.
*/
private void reduceResult(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
BrokerMetrics brokerMetrics)
private void reduceResult(BrokerResponseNative brokerResponseNative,
DataSchema dataSchema,
Collection<DataTable> dataTables,
DataTableReducerContext reducerContext,
String rawTableName,
BrokerMetrics brokerMetrics)
throws TimeoutException {
// NOTE: This step will modify the data schema and also return final aggregate results.
IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public FilterContext getFilter() {
}

/**
* Returns a list of expressions in the GROUP-BY clause, or {@code null} if there is no GROUP-BY clause.
* Returns a list of expressions in the GROUP-BY clause (aggregation keys), or {@code null} if there is no GROUP-BY
* clause.
*/
@Nullable
public List<ExpressionContext> getGroupByExpressions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,22 @@ public static QueryContext getQueryContext(PinotQuery pinotQuery) {
explainMode = ExplainMode.DESCRIPTION;
}

return new QueryContext.Builder().setTableName(tableName).setSubquery(subquery)
.setSelectExpressions(selectExpressions).setDistinct(distinct).setAliasList(aliasList).setFilter(filter)
.setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions()).setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode).build();
return new QueryContext.Builder()
.setTableName(tableName)
.setSubquery(subquery)
.setSelectExpressions(selectExpressions)
.setDistinct(distinct)
.setAliasList(aliasList)
.setFilter(filter)
.setGroupByExpressions(groupByExpressions)
.setOrderByExpressions(orderByExpressions)
.setHavingFilter(havingFilter)
.setLimit(pinotQuery.getLimit())
.setOffset(pinotQuery.getOffset())
.setQueryOptions(pinotQuery.getQueryOptions())
.setExpressionOverrideHints(expressionContextOverrideHints)
.setExplain(explainMode)
.build();
}

private static boolean isMultiStage(PinotQuery pinotQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public static IndexedTable createIndexedTableForCombineOperator(GroupByResultsBl
int limit = queryContext.getLimit();
boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
boolean hasHaving = queryContext.getHavingFilter() != null;
int minTrimSize = queryContext.getMinServerGroupTrimSize();
int minTrimSize =
queryContext.getMinServerGroupTrimSize(); // it's minBrokerGroupTrimSize in broker
int minInitialIndexedTableCapacity = queryContext.getMinInitialIndexedTableCapacity();

// Disable trim when min trim size is non-positive
Expand Down
Loading

0 comments on commit b603ad5

Please sign in to comment.