Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Jan 2, 2025
1 parent b26cad6 commit 51be961
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,15 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
@Nullable
public static Integer getNumThreadsForFinalReduce(Map<String, String> queryOptions) {
String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, -1);
return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, 1);
}

@Nullable
public static Integer getParallelChunkSizeForFinalReduce(Map<String, String> queryOptions) {
String parallelChunkSizeForFinalReduceString =
queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE, parallelChunkSizeForFinalReduceString,
1);
}

public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,38 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.trace.TraceCallable;


/**
* Base implementation of Map-based Table for indexed lookup
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
private static final int THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 1);
private static final ThreadPoolExecutor EXECUTOR_SERVICE = (ThreadPoolExecutor) Executors.newFixedThreadPool(
THREAD_POOL_SIZE, new ThreadFactory() {
private final AtomicInteger _threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "IndexedTable-pool-thread-" + _threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
});

protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
Expand Down Expand Up @@ -92,7 +106,8 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex
assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == Integer.MAX_VALUE);
_trimSize = trimSize;
_trimThreshold = trimThreshold;
_numThreadsForFinalReduce = queryContext.getNumThreadsForFinalReduce();
// NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default
_numThreadsForFinalReduce = inferNumThreadsForFinalReduce(queryContext);
}

@Override
Expand Down Expand Up @@ -166,27 +181,30 @@ public void finish(boolean sort, boolean storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
}
if (_numThreadsForFinalReduce > 1) {
// Submit task when the EXECUTOR_SERVICE is not overloaded
if ((_numThreadsForFinalReduce > 1) && (EXECUTOR_SERVICE.getQueue().size() < THREAD_POOL_SIZE * 3)) {
// Multi-threaded final reduce
List<Future<Void>> futures = new ArrayList<>();
try {
List<Record> topRecordsList = new ArrayList<>(_topRecords);
int chunkSize = (topRecordsList.size() + _numThreadsForFinalReduce - 1) / _numThreadsForFinalReduce;
List<Future<Void>> futures = new ArrayList<>();
for (int threadId = 0; threadId < _numThreadsForFinalReduce; threadId++) {
int startIdx = threadId * chunkSize;
int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());

if (startIdx < endIdx) {
// Submit a task for processing a chunk of values
futures.add(EXECUTOR_SERVICE.submit(() -> {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Object[] values = topRecordsList.get(recordIdx).getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
futures.add(EXECUTOR_SERVICE.submit(new TraceCallable<Void>() {
@Override
public Void callJob() {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Object[] values = topRecordsList.get(recordIdx).getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
}
}
return null;
}
return null;
}));
}
}
Expand All @@ -195,6 +213,10 @@ public void finish(boolean sort, boolean storeFinalResult) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
// Cancel all running tasks
for (Future<Void> future : futures) {
future.cancel(true);
}
throw new RuntimeException("Error during multi-threaded final reduce", e);
}
} else {
Expand All @@ -209,6 +231,38 @@ public void finish(boolean sort, boolean storeFinalResult) {
}
}

private int inferNumThreadsForFinalReduce(QueryContext queryContext) {
int numThreadsForFinalReduce = Math.min(queryContext.getNumThreadsForFinalReduce(),
Math.max(1, 2 * Runtime.getRuntime().availableProcessors()));
// Infer the enable parallelism when the keys > X and for specific aggregation functions like funnel
if (numThreadsForFinalReduce > 1) {
return numThreadsForFinalReduce;
}
if (containsExpensiveAggregationFunctions()) {
int parallelChunkSize = queryContext.getParallelChunkSizeForFinalReduce();
if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
return (int) Math.ceil((double) _topRecords.size() / parallelChunkSize);
}
}
// Default to 1 thread
return 1;
}

private boolean containsExpensiveAggregationFunctions() {
for (AggregationFunction aggregationFunction : _aggregationFunctions) {
switch (aggregationFunction.getType()) {
case FUNNELCOMPLETECOUNT:
case FUNNELCOUNT:
case FUNNELMATCHSTEP:
case FUNNELMAXSTEP:
return true;
default:
break;
}
}
return false;
}

@Override
public int size() {
return _topRecords != null ? _topRecords.size() : _lookupMap.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;

public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1;
public static final int DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = 10_000;

private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class);

Expand Down Expand Up @@ -272,6 +273,13 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE);
}
// Set parallelChunkSizeForFinalReduce
Integer parallelChunkSizeForFinalReduce = QueryOptionsUtils.getParallelChunkSizeForFinalReduce(queryOptions);
if (parallelChunkSizeForFinalReduce != null) {
queryContext.setParallelChunkSizeForFinalReduce(parallelChunkSizeForFinalReduce);
} else {
queryContext.setParallelChunkSizeForFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class QueryContext {
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
// Number of threads to use for final reduce
private int _numThreadsForFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE;
// Parallel chunk size for final reduce
private int _parallelChunkSizeForFinalReduce = InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
Expand Down Expand Up @@ -420,6 +422,14 @@ public void setNumThreadsForFinalReduce(int numThreadsForFinalReduce) {
_numThreadsForFinalReduce = numThreadsForFinalReduce;
}

public int getParallelChunkSizeForFinalReduce() {
return _parallelChunkSizeForFinalReduce;
}

public void setParallelChunkSizeForFinalReduce(int parallelChunkSizeForFinalReduce) {
_parallelChunkSizeForFinalReduce = parallelChunkSizeForFinalReduce;
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ public static class QueryOptionKey {
public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize";
public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize";
public static final String NUM_THREADS_FOR_FINAL_REDUCE = "numThreadsForFinalReduce";
public static final String PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = "parallelChunkSizeForFinalReduce";
public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery";
public static final String USE_FIXED_REPLICA = "useFixedReplica";
public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
Expand Down

0 comments on commit 51be961

Please sign in to comment.