Skip to content

Commit

Permalink
Add tests with numThreadsForFinalReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Jan 18, 2025
1 parent 2fba98b commit e18251b
Show file tree
Hide file tree
Showing 32 changed files with 249 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,17 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
}

@Nullable
public static Integer getNumThreadsForFinalReduce(Map<String, String> queryOptions) {
String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, 1);
public static Integer getNumThreadsForServerFinalReduce(Map<String, String> queryOptions) {
String numThreadsForServerFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_SERVER_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_SERVER_FINAL_REDUCE, numThreadsForServerFinalReduceString, 1);
}

@Nullable
public static Integer getParallelChunkSizeForFinalReduce(Map<String, String> queryOptions) {
String parallelChunkSizeForFinalReduceString =
queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE, parallelChunkSizeForFinalReduceString,
public static Integer getParallelChunkSizeForServerFinalReduce(Map<String, String> queryOptions) {
String parallelChunkSizeForServerFinalReduceString =
queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_SERVER_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_SERVER_FINAL_REDUCE,
parallelChunkSizeForServerFinalReduceString,
1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pinot.common.utils.DataSchema;
Expand All @@ -33,9 +34,9 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final ReentrantReadWriteLock _readWriteLock = new ReentrantReadWriteLock();

public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, int initialCapacity) {
int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold,
new ConcurrentHashMap<>(initialCapacity));
new ConcurrentHashMap<>(initialCapacity), executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
Expand All @@ -46,18 +43,8 @@
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
private static final int THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 1);
private static final ThreadPoolExecutor EXECUTOR_SERVICE = (ThreadPoolExecutor) Executors.newFixedThreadPool(
THREAD_POOL_SIZE, new ThreadFactory() {
private final AtomicInteger _threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "IndexedTable-pool-thread-" + _threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
});

private final ExecutorService _executorService;
protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
Expand All @@ -67,8 +54,8 @@ public Thread newThread(Runnable r) {
protected final TableResizer _tableResizer;
protected final int _trimSize;
protected final int _trimThreshold;
protected final int _numThreadsForFinalReduce;
protected final int _parallelChunkSizeForFinalReduce;
protected final int _numThreadsForServerFinalReduce;
protected final int _parallelChunkSizeForServerFinalReduce;

protected Collection<Record> _topRecords;
private int _numResizes;
Expand All @@ -86,13 +73,14 @@ public Thread newThread(Runnable r) {
* @param lookupMap Map from keys to records
*/
protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, Map<Key, Record> lookupMap) {
int trimSize, int trimThreshold, Map<Key, Record> lookupMap, ExecutorService executorService) {
super(dataSchema);

Preconditions.checkArgument(resultSize >= 0, "Result size can't be negative");
Preconditions.checkArgument(trimSize >= 0, "Trim size can't be negative");
Preconditions.checkArgument(trimThreshold >= 0, "Trim threshold can't be negative");

_executorService = executorService;
_lookupMap = lookupMap;
_hasFinalInput = hasFinalInput;
_resultSize = resultSize;
Expand All @@ -108,9 +96,9 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex
_trimSize = trimSize;
_trimThreshold = trimThreshold;
// NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default
_numThreadsForFinalReduce = Math.min(queryContext.getNumThreadsForFinalReduce(),
_numThreadsForServerFinalReduce = Math.min(queryContext.getNumThreadsForServerFinalReduce(),
Math.max(1, 2 * Runtime.getRuntime().availableProcessors()));
_parallelChunkSizeForFinalReduce = queryContext.getParallelChunkSizeForFinalReduce();
_parallelChunkSizeForServerFinalReduce = queryContext.getParallelChunkSizeForServerFinalReduce();
}

@Override
Expand Down Expand Up @@ -184,20 +172,20 @@ public void finish(boolean sort, boolean storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
}
int numThreadsForFinalReduce = inferNumThreadsForFinalReduce();
int numThreadsForServerFinalReduce = inferNumThreadsForServerFinalReduce();
// Submit task when the EXECUTOR_SERVICE is not overloaded
if ((numThreadsForFinalReduce > 1) && (EXECUTOR_SERVICE.getQueue().size() < THREAD_POOL_SIZE * 3)) {
if (numThreadsForServerFinalReduce > 1) {
// Multi-threaded final reduce
List<Future<Void>> futures = new ArrayList<>();
try {
List<Record> topRecordsList = new ArrayList<>(_topRecords);
int chunkSize = (topRecordsList.size() + numThreadsForFinalReduce - 1) / numThreadsForFinalReduce;
for (int threadId = 0; threadId < numThreadsForFinalReduce; threadId++) {
int chunkSize = (topRecordsList.size() + numThreadsForServerFinalReduce - 1) / numThreadsForServerFinalReduce;
for (int threadId = 0; threadId < numThreadsForServerFinalReduce; threadId++) {
int startIdx = threadId * chunkSize;
int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
if (startIdx < endIdx) {
// Submit a task for processing a chunk of values
futures.add(EXECUTOR_SERVICE.submit(new TraceCallable<Void>() {
futures.add(_executorService.submit(new TraceCallable<Void>() {
@Override
public Void callJob() {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Expand Down Expand Up @@ -235,12 +223,12 @@ public Void callJob() {
}
}

private int inferNumThreadsForFinalReduce() {
if (_numThreadsForFinalReduce > 1) {
return _numThreadsForFinalReduce;
private int inferNumThreadsForServerFinalReduce() {
if (_numThreadsForServerFinalReduce > 1) {
return _numThreadsForServerFinalReduce;
}
if (containsExpensiveAggregationFunctions()) {
int parallelChunkSize = _parallelChunkSizeForFinalReduce;
int parallelChunkSize = _parallelChunkSizeForServerFinalReduce;
if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
int estimatedThreads = (int) Math.ceil((double) _topRecords.size() / parallelChunkSize);
if (estimatedThreads == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
Expand All @@ -31,8 +32,9 @@
public class SimpleIndexedTable extends IndexedTable {

public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity));
int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity),
executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.data.table;

import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;

Expand All @@ -36,8 +37,8 @@
public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {

public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext,
int resultSize, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
int resultSize, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity, executorService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ protected void processSegments() {
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks);
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks,
_executorService);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,20 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
}
// Set numThreadsForFinalReduce
Integer numThreadsForFinalReduce = QueryOptionsUtils.getNumThreadsForFinalReduce(queryOptions);
if (numThreadsForFinalReduce != null) {
queryContext.setNumThreadsForFinalReduce(numThreadsForFinalReduce);
// Set numThreadsForServerFinalReduce
Integer numThreadsForServerFinalReduce = QueryOptionsUtils.getNumThreadsForServerFinalReduce(queryOptions);
if (numThreadsForServerFinalReduce != null) {
queryContext.setNumThreadsForServerFinalReduce(numThreadsForServerFinalReduce);
} else {
queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE);
queryContext.setNumThreadsForServerFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE);
}
// Set parallelChunkSizeForFinalReduce
Integer parallelChunkSizeForFinalReduce = QueryOptionsUtils.getParallelChunkSizeForFinalReduce(queryOptions);
if (parallelChunkSizeForFinalReduce != null) {
queryContext.setParallelChunkSizeForFinalReduce(parallelChunkSizeForFinalReduce);
// Set parallelChunkSizeForServerFinalReduce
Integer parallelChunkSizeForServerFinalReduce =
QueryOptionsUtils.getParallelChunkSizeForServerFinalReduce(queryOptions);
if (parallelChunkSizeForServerFinalReduce != null) {
queryContext.setParallelChunkSizeForServerFinalReduce(parallelChunkSizeForServerFinalReduce);
} else {
queryContext.setParallelChunkSizeForFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
queryContext.setParallelChunkSizeForServerFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable
// Create an indexed table to perform the reduce.
IndexedTable indexedTable =
GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0), _queryContext, reducerContext,
numReduceThreadsToUse);
numReduceThreadsToUse, reducerContext.getExecutorService());

// Create groups of data tables that each thread can process concurrently.
// Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ public class QueryContext {
// Trim threshold to use for server combine for SQL GROUP BY
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
// Number of threads to use for final reduce
private int _numThreadsForFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE;
private int _numThreadsForServerFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE;
// Parallel chunk size for final reduce
private int _parallelChunkSizeForFinalReduce = InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE;
private int _parallelChunkSizeForServerFinalReduce =
InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
Expand Down Expand Up @@ -415,20 +416,20 @@ public void setGroupTrimThreshold(int groupTrimThreshold) {
_groupTrimThreshold = groupTrimThreshold;
}

public int getNumThreadsForFinalReduce() {
return _numThreadsForFinalReduce;
public int getNumThreadsForServerFinalReduce() {
return _numThreadsForServerFinalReduce;
}

public void setNumThreadsForFinalReduce(int numThreadsForFinalReduce) {
_numThreadsForFinalReduce = numThreadsForFinalReduce;
public void setNumThreadsForServerFinalReduce(int numThreadsForServerFinalReduce) {
_numThreadsForServerFinalReduce = numThreadsForServerFinalReduce;
}

public int getParallelChunkSizeForFinalReduce() {
return _parallelChunkSizeForFinalReduce;
public int getParallelChunkSizeForServerFinalReduce() {
return _parallelChunkSizeForServerFinalReduce;
}

public void setParallelChunkSizeForFinalReduce(int parallelChunkSizeForFinalReduce) {
_parallelChunkSizeForFinalReduce = parallelChunkSizeForFinalReduce;
public void setParallelChunkSizeForServerFinalReduce(int parallelChunkSizeForServerFinalReduce) {
_parallelChunkSizeForServerFinalReduce = parallelChunkSizeForServerFinalReduce;
}

public boolean isNullHandlingEnabled() {
Expand Down
Loading

0 comments on commit e18251b

Please sign in to comment.