Skip to content

Commit

Permalink
Add tests with numThreadsForFinalReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Jan 18, 2025
1 parent 2fba98b commit 756877c
Show file tree
Hide file tree
Showing 30 changed files with 239 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,17 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
}

@Nullable
public static Integer getNumThreadsForFinalReduce(Map<String, String> queryOptions) {
String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, 1);
public static Integer getNumThreadsForServerFinalReduce(Map<String, String> queryOptions) {
String numThreadsForServerFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_SERVER_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_SERVER_FINAL_REDUCE, numThreadsForServerFinalReduceString, 1);
}

@Nullable
public static Integer getParallelChunkSizeForFinalReduce(Map<String, String> queryOptions) {
String parallelChunkSizeForFinalReduceString =
queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE, parallelChunkSizeForFinalReduceString,
public static Integer getParallelChunkSizeForServerFinalReduce(Map<String, String> queryOptions) {
String parallelChunkSizeForServerFinalReduceString =
queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_SERVER_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_SERVER_FINAL_REDUCE,
parallelChunkSizeForServerFinalReduceString,
1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pinot.common.utils.DataSchema;
Expand All @@ -33,9 +34,9 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final ReentrantReadWriteLock _readWriteLock = new ReentrantReadWriteLock();

public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, int initialCapacity) {
int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold,
new ConcurrentHashMap<>(initialCapacity));
new ConcurrentHashMap<>(initialCapacity), executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
Expand All @@ -46,18 +43,8 @@
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
private static final int THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 1);
private static final ThreadPoolExecutor EXECUTOR_SERVICE = (ThreadPoolExecutor) Executors.newFixedThreadPool(
THREAD_POOL_SIZE, new ThreadFactory() {
private final AtomicInteger _threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "IndexedTable-pool-thread-" + _threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
});

private final ExecutorService _executorService;
protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
Expand All @@ -67,8 +54,8 @@ public Thread newThread(Runnable r) {
protected final TableResizer _tableResizer;
protected final int _trimSize;
protected final int _trimThreshold;
protected final int _numThreadsForFinalReduce;
protected final int _parallelChunkSizeForFinalReduce;
protected final int _numThreadsForServerFinalReduce;
protected final int _parallelChunkSizeForServerFinalReduce;

protected Collection<Record> _topRecords;
private int _numResizes;
Expand All @@ -86,13 +73,14 @@ public Thread newThread(Runnable r) {
* @param lookupMap Map from keys to records
*/
protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, Map<Key, Record> lookupMap) {
int trimSize, int trimThreshold, Map<Key, Record> lookupMap, ExecutorService executorService) {
super(dataSchema);

Preconditions.checkArgument(resultSize >= 0, "Result size can't be negative");
Preconditions.checkArgument(trimSize >= 0, "Trim size can't be negative");
Preconditions.checkArgument(trimThreshold >= 0, "Trim threshold can't be negative");

_executorService = executorService;
_lookupMap = lookupMap;
_hasFinalInput = hasFinalInput;
_resultSize = resultSize;
Expand All @@ -108,9 +96,9 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex
_trimSize = trimSize;
_trimThreshold = trimThreshold;
// NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default
_numThreadsForFinalReduce = Math.min(queryContext.getNumThreadsForFinalReduce(),
_numThreadsForServerFinalReduce = Math.min(queryContext.getNumThreadsForServerFinalReduce(),
Math.max(1, 2 * Runtime.getRuntime().availableProcessors()));
_parallelChunkSizeForFinalReduce = queryContext.getParallelChunkSizeForFinalReduce();
_parallelChunkSizeForServerFinalReduce = queryContext.getParallelChunkSizeForServerFinalReduce();
}

@Override
Expand Down Expand Up @@ -184,20 +172,20 @@ public void finish(boolean sort, boolean storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
}
int numThreadsForFinalReduce = inferNumThreadsForFinalReduce();
int numThreadsForServerFinalReduce = inferNumThreadsForServerFinalReduce();
// Submit task when the EXECUTOR_SERVICE is not overloaded
if ((numThreadsForFinalReduce > 1) && (EXECUTOR_SERVICE.getQueue().size() < THREAD_POOL_SIZE * 3)) {
if (numThreadsForServerFinalReduce > 1) {
// Multi-threaded final reduce
List<Future<Void>> futures = new ArrayList<>();
try {
List<Record> topRecordsList = new ArrayList<>(_topRecords);
int chunkSize = (topRecordsList.size() + numThreadsForFinalReduce - 1) / numThreadsForFinalReduce;
for (int threadId = 0; threadId < numThreadsForFinalReduce; threadId++) {
int chunkSize = (topRecordsList.size() + numThreadsForServerFinalReduce - 1) / numThreadsForServerFinalReduce;
for (int threadId = 0; threadId < numThreadsForServerFinalReduce; threadId++) {
int startIdx = threadId * chunkSize;
int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
if (startIdx < endIdx) {
// Submit a task for processing a chunk of values
futures.add(EXECUTOR_SERVICE.submit(new TraceCallable<Void>() {
futures.add(_executorService.submit(new TraceCallable<Void>() {
@Override
public Void callJob() {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Expand Down Expand Up @@ -235,12 +223,12 @@ public Void callJob() {
}
}

private int inferNumThreadsForFinalReduce() {
if (_numThreadsForFinalReduce > 1) {
return _numThreadsForFinalReduce;
private int inferNumThreadsForServerFinalReduce() {
if (_numThreadsForServerFinalReduce > 1) {
return _numThreadsForServerFinalReduce;
}
if (containsExpensiveAggregationFunctions()) {
int parallelChunkSize = _parallelChunkSizeForFinalReduce;
int parallelChunkSize = _parallelChunkSizeForServerFinalReduce;
if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
int estimatedThreads = (int) Math.ceil((double) _topRecords.size() / parallelChunkSize);
if (estimatedThreads == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
Expand All @@ -31,8 +32,9 @@
public class SimpleIndexedTable extends IndexedTable {

public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity));
int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity),
executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ protected void processSegments() {
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks);
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks,
_executorService);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,20 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
}
// Set numThreadsForFinalReduce
Integer numThreadsForFinalReduce = QueryOptionsUtils.getNumThreadsForFinalReduce(queryOptions);
if (numThreadsForFinalReduce != null) {
queryContext.setNumThreadsForFinalReduce(numThreadsForFinalReduce);
// Set numThreadsForServerFinalReduce
Integer numThreadsForServerFinalReduce = QueryOptionsUtils.getNumThreadsForServerFinalReduce(queryOptions);
if (numThreadsForServerFinalReduce != null) {
queryContext.setNumThreadsForServerFinalReduce(numThreadsForServerFinalReduce);
} else {
queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE);
queryContext.setNumThreadsForServerFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE);
}
// Set parallelChunkSizeForFinalReduce
Integer parallelChunkSizeForFinalReduce = QueryOptionsUtils.getParallelChunkSizeForFinalReduce(queryOptions);
if (parallelChunkSizeForFinalReduce != null) {
queryContext.setParallelChunkSizeForFinalReduce(parallelChunkSizeForFinalReduce);
// Set parallelChunkSizeForServerFinalReduce
Integer parallelChunkSizeForServerFinalReduce =
QueryOptionsUtils.getParallelChunkSizeForServerFinalReduce(queryOptions);
if (parallelChunkSizeForServerFinalReduce != null) {
queryContext.setParallelChunkSizeForServerFinalReduce(parallelChunkSizeForServerFinalReduce);
} else {
queryContext.setParallelChunkSizeForFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
queryContext.setParallelChunkSizeForServerFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ public class QueryContext {
// Trim threshold to use for server combine for SQL GROUP BY
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
// Number of threads to use for final reduce
private int _numThreadsForFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE;
private int _numThreadsForServerFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE;
// Parallel chunk size for final reduce
private int _parallelChunkSizeForFinalReduce = InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE;
private int _parallelChunkSizeForServerFinalReduce =
InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
Expand Down Expand Up @@ -415,20 +416,20 @@ public void setGroupTrimThreshold(int groupTrimThreshold) {
_groupTrimThreshold = groupTrimThreshold;
}

public int getNumThreadsForFinalReduce() {
return _numThreadsForFinalReduce;
public int getNumThreadsForServerFinalReduce() {
return _numThreadsForServerFinalReduce;
}

public void setNumThreadsForFinalReduce(int numThreadsForFinalReduce) {
_numThreadsForFinalReduce = numThreadsForFinalReduce;
public void setNumThreadsForServerFinalReduce(int numThreadsForServerFinalReduce) {
_numThreadsForServerFinalReduce = numThreadsForServerFinalReduce;
}

public int getParallelChunkSizeForFinalReduce() {
return _parallelChunkSizeForFinalReduce;
public int getParallelChunkSizeForServerFinalReduce() {
return _parallelChunkSizeForServerFinalReduce;
}

public void setParallelChunkSizeForFinalReduce(int parallelChunkSizeForFinalReduce) {
_parallelChunkSizeForFinalReduce = parallelChunkSizeForFinalReduce;
public void setParallelChunkSizeForServerFinalReduce(int parallelChunkSizeForServerFinalReduce) {
_parallelChunkSizeForServerFinalReduce = parallelChunkSizeForServerFinalReduce;
}

public boolean isNullHandlingEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.util;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.HashUtil;
Expand Down Expand Up @@ -49,7 +50,7 @@ public static int getTableCapacity(int limit) {
/**
* Returns the capacity of the table required by the given query.
* NOTE: It returns {@code max(limit * 5, minNumGroups)} where minNumGroups is configurable to tune the table size and
* result accuracy.
* result accuracy.
*/
public static int getTableCapacity(int limit, int minNumGroups) {
long capacityByLimit = limit * 5L;
Expand Down Expand Up @@ -93,7 +94,7 @@ static int getIndexedTableInitialCapacity(int maxRowsToKeep, int minNumGroups, i
* Creates an indexed table for the combine operator given a sample results block.
*/
public static IndexedTable createIndexedTableForCombineOperator(GroupByResultsBlock resultsBlock,
QueryContext queryContext, int numThreads) {
QueryContext queryContext, int numThreads, ExecutorService executorService) {
DataSchema dataSchema = resultsBlock.getDataSchema();
int numGroups = resultsBlock.getNumGroups();
int limit = queryContext.getLimit();
Expand All @@ -119,7 +120,8 @@ public static IndexedTable createIndexedTableForCombineOperator(GroupByResultsBl
resultSize = limit;
}
int initialCapacity = getIndexedTableInitialCapacity(resultSize, numGroups, minInitialIndexedTableCapacity);
return getTrimDisabledIndexedTable(dataSchema, false, queryContext, resultSize, initialCapacity, numThreads);
return getTrimDisabledIndexedTable(dataSchema, false, queryContext, resultSize, initialCapacity, numThreads,
executorService);
}

int resultSize;
Expand All @@ -132,10 +134,11 @@ public static IndexedTable createIndexedTableForCombineOperator(GroupByResultsBl
int trimThreshold = getIndexedTableTrimThreshold(trimSize, queryContext.getGroupTrimThreshold());
int initialCapacity = getIndexedTableInitialCapacity(trimThreshold, numGroups, minInitialIndexedTableCapacity);
if (trimThreshold == Integer.MAX_VALUE) {
return getTrimDisabledIndexedTable(dataSchema, false, queryContext, resultSize, initialCapacity, numThreads);
return getTrimDisabledIndexedTable(dataSchema, false, queryContext, resultSize, initialCapacity, numThreads,
executorService);
} else {
return getTrimEnabledIndexedTable(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold,
initialCapacity, numThreads);
initialCapacity, numThreads, executorService);
}
}

Expand Down Expand Up @@ -181,24 +184,26 @@ public static IndexedTable createIndexedTableForDataTableReducer(DataTable dataT
}

private static IndexedTable getTrimDisabledIndexedTable(DataSchema dataSchema, boolean hasFinalInput,
QueryContext queryContext, int resultSize, int initialCapacity, int numThreads) {
QueryContext queryContext, int resultSize, int initialCapacity, int numThreads, ExecutorService executorService) {
if (numThreads == 1) {
return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE,
Integer.MAX_VALUE, initialCapacity);
Integer.MAX_VALUE, initialCapacity, executorService);
} else {
return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput, queryContext, resultSize, initialCapacity);
return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput, queryContext, resultSize, initialCapacity,
executorService);
}
}

private static IndexedTable getTrimEnabledIndexedTable(DataSchema dataSchema, boolean hasFinalInput,
QueryContext queryContext, int resultSize, int trimSize, int trimThreshold, int initialCapacity, int numThreads) {
QueryContext queryContext, int resultSize, int trimSize, int trimThreshold, int initialCapacity, int numThreads,
ExecutorService executorService) {
assert trimThreshold != Integer.MAX_VALUE;
if (numThreads == 1) {
return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold,
initialCapacity);
} else {
return new ConcurrentIndexedTable(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold,
initialCapacity);
initialCapacity, executorService);
}
}
}
Loading

0 comments on commit 756877c

Please sign in to comment.