Skip to content

Commit

Permalink
Configure final reduce phase threads for heavy aggreagtion functions
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Jan 18, 2025
1 parent 44b07b0 commit 44cc404
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, groupByTrimThreshold);
}

@Nullable
public static Integer getNumThreadsForFinalReduce(Map<String, String> queryOptions) {
String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, -1);
}

public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
package org.apache.pinot.core.data.table;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
Expand All @@ -37,6 +42,8 @@
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
Expand All @@ -46,6 +53,7 @@ public abstract class IndexedTable extends BaseTable {
protected final TableResizer _tableResizer;
protected final int _trimSize;
protected final int _trimThreshold;
protected final int _numThreadsForFinalReduce;

protected Collection<Record> _topRecords;
private int _numResizes;
Expand Down Expand Up @@ -84,6 +92,7 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex
assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == Integer.MAX_VALUE);
_trimSize = trimSize;
_trimThreshold = trimThreshold;
_numThreadsForFinalReduce = queryContext.getNumThreadsForFinalReduce();
}

@Override
Expand Down Expand Up @@ -157,11 +166,44 @@ public void finish(boolean sort, boolean storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
}
for (Record record : _topRecords) {
Object[] values = record.getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
if (_numThreadsForFinalReduce > 1) {
// Multi-threaded final reduce
try {
List<Record> topRecordsList = new ArrayList<>(_topRecords);
int chunkSize = (topRecordsList.size() + _numThreadsForFinalReduce - 1) / _numThreadsForFinalReduce;
List<Future<Void>> futures = new ArrayList<>();
for (int threadId = 0; threadId < _numThreadsForFinalReduce; threadId++) {
int startIdx = threadId * chunkSize;
int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());

if (startIdx < endIdx) {
// Submit a task for processing a chunk of values
futures.add(EXECUTOR_SERVICE.submit(() -> {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Object[] values = topRecordsList.get(recordIdx).getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
}
}
return null;
}));
}
}
// Wait for all tasks to complete
for (Future<Void> future : futures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error during multi-threaded final reduce", e);
}
} else {
for (Record record : _topRecords) {
Object[] values = record.getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String GROUPBY_TRIM_THRESHOLD_KEY = "groupby.trim.threshold";
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;

public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1;

private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class);

private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner();
Expand Down Expand Up @@ -268,6 +270,13 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
}
// Set numThreadsForFinalReduce
Integer numThreadsForFinalReduce = QueryOptionsUtils.getNumThreadsForFinalReduce(queryOptions);
if (numThreadsForFinalReduce != null) {
queryContext.setNumThreadsForFinalReduce(numThreadsForFinalReduce);
} else {
queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ public class QueryContext {
private int _minServerGroupTrimSize = InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
// Trim threshold to use for server combine for SQL GROUP BY
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
// Number of threads to use for final reduce
private int _numThreadsForFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
Expand Down Expand Up @@ -411,6 +413,14 @@ public void setGroupTrimThreshold(int groupTrimThreshold) {
_groupTrimThreshold = groupTrimThreshold;
}

public int getNumThreadsForFinalReduce() {
return _numThreadsForFinalReduce;
}

public void setNumThreadsForFinalReduce(int numThreadsForFinalReduce) {
_numThreadsForFinalReduce = numThreadsForFinalReduce;
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ public static class QueryOptionKey {

/** Max number of groups GroupByDataTableReducer (running at broker) should return. */
public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize";
public static final String NUM_THREADS_FOR_FINAL_REDUCE = "numThreadsForFinalReduce";

public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery";
public static final String USE_FIXED_REPLICA = "useFixedReplica";
Expand Down

0 comments on commit 44cc404

Please sign in to comment.