diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 1aacb5fdc03c..9af0e44c5545 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -244,7 +244,15 @@ public static Integer getGroupTrimThreshold(Map queryOptions) { @Nullable public static Integer getNumThreadsForFinalReduce(Map queryOptions) { String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE); - return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, -1); + return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, 1); + } + + @Nullable + public static Integer getParallelChunkSizeForFinalReduce(Map queryOptions) { + String parallelChunkSizeForFinalReduceString = + queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE); + return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE, parallelChunkSizeForFinalReduceString, + 1); } public static boolean isNullHandlingEnabled(Map queryOptions) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java index 30a60b7b1f43..1cba3120eb59 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java @@ -29,12 +29,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.trace.TraceCallable; /** @@ -42,8 +45,18 @@ */ @SuppressWarnings({"rawtypes", "unchecked"}) public abstract class IndexedTable extends BaseTable { - private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime() - .availableProcessors()); + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + private final AtomicInteger _threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "IndexedTable-pool-thread-" + _threadNumber.getAndIncrement()); + t.setDaemon(true); + return t; + } + }); + protected final Map _lookupMap; protected final boolean _hasFinalInput; protected final int _resultSize; @@ -92,7 +105,8 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == Integer.MAX_VALUE); _trimSize = trimSize; _trimThreshold = trimThreshold; - _numThreadsForFinalReduce = queryContext.getNumThreadsForFinalReduce(); + // NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default + _numThreadsForFinalReduce = inferNumThreadsForFinalReduce(queryContext); } @Override @@ -168,25 +182,28 @@ public void finish(boolean sort, boolean storeFinalResult) { } if (_numThreadsForFinalReduce > 1) { // Multi-threaded final reduce + List> futures = new ArrayList<>(); try { List topRecordsList = new ArrayList<>(_topRecords); int chunkSize = (topRecordsList.size() + _numThreadsForFinalReduce - 1) / _numThreadsForFinalReduce; - List> futures = new ArrayList<>(); for (int threadId = 0; threadId < _numThreadsForFinalReduce; threadId++) { int startIdx = threadId * chunkSize; int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size()); - if (startIdx < endIdx) { // Submit a task for processing a chunk of values - futures.add(EXECUTOR_SERVICE.submit(() -> { - for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) { - Object[] values = topRecordsList.get(recordIdx).getValues(); - for (int i = 0; i < numAggregationFunctions; i++) { - int colId = i + _numKeyColumns; - values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]); + + futures.add(EXECUTOR_SERVICE.submit(new TraceCallable() { + @Override + public Void callJob() { + for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) { + Object[] values = topRecordsList.get(recordIdx).getValues(); + for (int i = 0; i < numAggregationFunctions; i++) { + int colId = i + _numKeyColumns; + values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]); + } } + return null; } - return null; })); } } @@ -195,6 +212,10 @@ public void finish(boolean sort, boolean storeFinalResult) { future.get(); } } catch (InterruptedException | ExecutionException e) { + // Cancel all running tasks + for (Future future : futures) { + future.cancel(true); + } throw new RuntimeException("Error during multi-threaded final reduce", e); } } else { @@ -209,6 +230,38 @@ public void finish(boolean sort, boolean storeFinalResult) { } } + private int inferNumThreadsForFinalReduce(QueryContext queryContext) { + int numThreadsForFinalReduce = Math.min(queryContext.getNumThreadsForFinalReduce(), + Math.max(1, 2 * Runtime.getRuntime().availableProcessors())); + // Infer the enable parallelism when the keys > X and for specific aggregation functions like funnel + if (numThreadsForFinalReduce > 1) { + return numThreadsForFinalReduce; + } + if (containsExpensiveAggregationFunctions()) { + int parallelChunkSize = queryContext.getParallelChunkSizeForFinalReduce(); + if (_topRecords.size() > parallelChunkSize) { + return (int) Math.ceil((double) _topRecords.size() / parallelChunkSize); + } + } + // Default to 1 thread + return 1; + } + + private boolean containsExpensiveAggregationFunctions() { + for (AggregationFunction aggregationFunction : _aggregationFunctions) { + switch (aggregationFunction.getType()) { + case FUNNELCOMPLETECOUNT: + case FUNNELCOUNT: + case FUNNELMATCHSTEP: + case FUNNELMAXSTEP: + return true; + default: + break; + } + } + return false; + } + @Override public int size() { return _topRecords != null ? _topRecords.size() : _lookupMap.size(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 85ec039da1f9..3e6262fffd8f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -91,6 +91,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1; + public static final int DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = 10_000; private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); @@ -272,6 +273,13 @@ private void applyQueryOptions(QueryContext queryContext) { } else { queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE); } + // Set parallelChunkSizeForFinalReduce + Integer parallelChunkSizeForFinalReduce = QueryOptionsUtils.getParallelChunkSizeForFinalReduce(queryOptions); + if (parallelChunkSizeForFinalReduce != null) { + queryContext.setParallelChunkSizeForFinalReduce(parallelChunkSizeForFinalReduce); + } else { + queryContext.setParallelChunkSizeForFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE); + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index 922189c7fa04..5fa3e7183651 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -126,6 +126,8 @@ public class QueryContext { private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD; // Number of threads to use for final reduce private int _numThreadsForFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE; + // Parallel chunk size for final reduce + private int _parallelChunkSizeForFinalReduce = InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE; // Whether null handling is enabled private boolean _nullHandlingEnabled; // Whether server returns the final result @@ -420,6 +422,14 @@ public void setNumThreadsForFinalReduce(int numThreadsForFinalReduce) { _numThreadsForFinalReduce = numThreadsForFinalReduce; } + public int getParallelChunkSizeForFinalReduce() { + return _parallelChunkSizeForFinalReduce; + } + + public void setParallelChunkSizeForFinalReduce(int parallelChunkSizeForFinalReduce) { + _parallelChunkSizeForFinalReduce = parallelChunkSizeForFinalReduce; + } + public boolean isNullHandlingEnabled() { return _nullHandlingEnabled; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 79ee78ba50c9..0f16528d9dd5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -413,6 +413,7 @@ public static class QueryOptionKey { public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize"; public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize"; public static final String NUM_THREADS_FOR_FINAL_REDUCE = "numThreadsForFinalReduce"; + public static final String PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = "parallelChunkSizeForFinalReduce"; public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery"; public static final String USE_FIXED_REPLICA = "useFixedReplica"; public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";