Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Address comments
Browse files Browse the repository at this point in the history
xiangfu0 committed Jan 18, 2025
1 parent 44cc404 commit 2fba98b
Showing 5 changed files with 103 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -251,7 +251,15 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
@Nullable
public static Integer getNumThreadsForFinalReduce(Map<String, String> queryOptions) {
String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, -1);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, 1);
}

@Nullable
public static Integer getParallelChunkSizeForFinalReduce(Map<String, String> queryOptions) {
String parallelChunkSizeForFinalReduceString =
queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE, parallelChunkSizeForFinalReduceString,
1);
}

public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
Original file line number Diff line number Diff line change
@@ -26,24 +26,38 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.trace.TraceCallable;


/**
* Base implementation of Map-based Table for indexed lookup
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
private static final int THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 1);
private static final ThreadPoolExecutor EXECUTOR_SERVICE = (ThreadPoolExecutor) Executors.newFixedThreadPool(
THREAD_POOL_SIZE, new ThreadFactory() {
private final AtomicInteger _threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "IndexedTable-pool-thread-" + _threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
});

protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
@@ -54,6 +68,7 @@ public abstract class IndexedTable extends BaseTable {
protected final int _trimSize;
protected final int _trimThreshold;
protected final int _numThreadsForFinalReduce;
protected final int _parallelChunkSizeForFinalReduce;

protected Collection<Record> _topRecords;
private int _numResizes;
@@ -92,7 +107,10 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex
assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == Integer.MAX_VALUE);
_trimSize = trimSize;
_trimThreshold = trimThreshold;
_numThreadsForFinalReduce = queryContext.getNumThreadsForFinalReduce();
// NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default
_numThreadsForFinalReduce = Math.min(queryContext.getNumThreadsForFinalReduce(),
Math.max(1, 2 * Runtime.getRuntime().availableProcessors()));
_parallelChunkSizeForFinalReduce = queryContext.getParallelChunkSizeForFinalReduce();
}

@Override
@@ -166,27 +184,31 @@ public void finish(boolean sort, boolean storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
}
if (_numThreadsForFinalReduce > 1) {
int numThreadsForFinalReduce = inferNumThreadsForFinalReduce();
// Submit task when the EXECUTOR_SERVICE is not overloaded
if ((numThreadsForFinalReduce > 1) && (EXECUTOR_SERVICE.getQueue().size() < THREAD_POOL_SIZE * 3)) {
// Multi-threaded final reduce
List<Future<Void>> futures = new ArrayList<>();
try {
List<Record> topRecordsList = new ArrayList<>(_topRecords);
int chunkSize = (topRecordsList.size() + _numThreadsForFinalReduce - 1) / _numThreadsForFinalReduce;
List<Future<Void>> futures = new ArrayList<>();
for (int threadId = 0; threadId < _numThreadsForFinalReduce; threadId++) {
int chunkSize = (topRecordsList.size() + numThreadsForFinalReduce - 1) / numThreadsForFinalReduce;
for (int threadId = 0; threadId < numThreadsForFinalReduce; threadId++) {
int startIdx = threadId * chunkSize;
int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());

if (startIdx < endIdx) {
// Submit a task for processing a chunk of values
futures.add(EXECUTOR_SERVICE.submit(() -> {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Object[] values = topRecordsList.get(recordIdx).getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
futures.add(EXECUTOR_SERVICE.submit(new TraceCallable<Void>() {
@Override
public Void callJob() {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Object[] values = topRecordsList.get(recordIdx).getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
}
}
return null;
}
return null;
}));
}
}
@@ -195,6 +217,10 @@ public void finish(boolean sort, boolean storeFinalResult) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
// Cancel all running tasks
for (Future<Void> future : futures) {
future.cancel(true);
}
throw new RuntimeException("Error during multi-threaded final reduce", e);
}
} else {
@@ -209,6 +235,39 @@ public void finish(boolean sort, boolean storeFinalResult) {
}
}

private int inferNumThreadsForFinalReduce() {
if (_numThreadsForFinalReduce > 1) {
return _numThreadsForFinalReduce;
}
if (containsExpensiveAggregationFunctions()) {
int parallelChunkSize = _parallelChunkSizeForFinalReduce;
if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
int estimatedThreads = (int) Math.ceil((double) _topRecords.size() / parallelChunkSize);
if (estimatedThreads == 0) {
return 1;
}
return Math.min(estimatedThreads, THREAD_POOL_SIZE);
}
}
// Default to 1 thread
return 1;
}

private boolean containsExpensiveAggregationFunctions() {
for (AggregationFunction aggregationFunction : _aggregationFunctions) {
switch (aggregationFunction.getType()) {
case FUNNELCOMPLETECOUNT:
case FUNNELCOUNT:
case FUNNELMATCHSTEP:
case FUNNELMAXSTEP:
return true;
default:
break;
}
}
return false;
}

@Override
public int size() {
return _topRecords != null ? _topRecords.size() : _lookupMap.size();
Original file line number Diff line number Diff line change
@@ -96,6 +96,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;

public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1;
public static final int DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = 10_000;

private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class);

@@ -277,6 +278,13 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE);
}
// Set parallelChunkSizeForFinalReduce
Integer parallelChunkSizeForFinalReduce = QueryOptionsUtils.getParallelChunkSizeForFinalReduce(queryOptions);
if (parallelChunkSizeForFinalReduce != null) {
queryContext.setParallelChunkSizeForFinalReduce(parallelChunkSizeForFinalReduce);
} else {
queryContext.setParallelChunkSizeForFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
}
}
}

Original file line number Diff line number Diff line change
@@ -126,6 +126,8 @@ public class QueryContext {
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
// Number of threads to use for final reduce
private int _numThreadsForFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE;
// Parallel chunk size for final reduce
private int _parallelChunkSizeForFinalReduce = InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
@@ -421,6 +423,14 @@ public void setNumThreadsForFinalReduce(int numThreadsForFinalReduce) {
_numThreadsForFinalReduce = numThreadsForFinalReduce;
}

public int getParallelChunkSizeForFinalReduce() {
return _parallelChunkSizeForFinalReduce;
}

public void setParallelChunkSizeForFinalReduce(int parallelChunkSizeForFinalReduce) {
_parallelChunkSizeForFinalReduce = parallelChunkSizeForFinalReduce;
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}
Original file line number Diff line number Diff line change
@@ -435,6 +435,7 @@ public static class QueryOptionKey {
/** Max number of groups GroupByDataTableReducer (running at broker) should return. */
public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize";
public static final String NUM_THREADS_FOR_FINAL_REDUCE = "numThreadsForFinalReduce";
public static final String PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = "parallelChunkSizeForFinalReduce";

public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery";
public static final String USE_FIXED_REPLICA = "useFixedReplica";

0 comments on commit 2fba98b

Please sign in to comment.