From 2fba98bea8cef6a7add93153e8f37ebbc7e1697c Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Wed, 1 Jan 2025 07:09:59 -0800 Subject: [PATCH] Address comments --- .../utils/config/QueryOptionsUtils.java | 10 +- .../pinot/core/data/table/IndexedTable.java | 91 +++++++++++++++---- .../plan/maker/InstancePlanMakerImplV2.java | 8 ++ .../query/request/context/QueryContext.java | 10 ++ .../pinot/spi/utils/CommonConstants.java | 1 + 5 files changed, 103 insertions(+), 17 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 7f5acb3859c7..674064e513cf 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -251,7 +251,15 @@ public static Integer getGroupTrimThreshold(Map queryOptions) { @Nullable public static Integer getNumThreadsForFinalReduce(Map queryOptions) { String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE); - return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, -1); + return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, 1); + } + + @Nullable + public static Integer getParallelChunkSizeForFinalReduce(Map queryOptions) { + String parallelChunkSizeForFinalReduceString = + queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE); + return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE, parallelChunkSizeForFinalReduceString, + 1); } public static boolean isNullHandlingEnabled(Map queryOptions) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java index 30a60b7b1f43..2d719ef8724a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java @@ -26,15 +26,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.trace.TraceCallable; /** @@ -42,8 +45,19 @@ */ @SuppressWarnings({"rawtypes", "unchecked"}) public abstract class IndexedTable extends BaseTable { - private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime() - .availableProcessors()); + private static final int THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 1); + private static final ThreadPoolExecutor EXECUTOR_SERVICE = (ThreadPoolExecutor) Executors.newFixedThreadPool( + THREAD_POOL_SIZE, new ThreadFactory() { + private final AtomicInteger _threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "IndexedTable-pool-thread-" + _threadNumber.getAndIncrement()); + t.setDaemon(true); + return t; + } + }); + protected final Map _lookupMap; protected final boolean _hasFinalInput; protected final int _resultSize; @@ -54,6 +68,7 @@ public abstract class IndexedTable extends BaseTable { protected final int _trimSize; protected final int _trimThreshold; protected final int _numThreadsForFinalReduce; + protected final int _parallelChunkSizeForFinalReduce; protected Collection _topRecords; private int _numResizes; @@ -92,7 +107,10 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == Integer.MAX_VALUE); _trimSize = trimSize; _trimThreshold = trimThreshold; - _numThreadsForFinalReduce = queryContext.getNumThreadsForFinalReduce(); + // NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default + _numThreadsForFinalReduce = Math.min(queryContext.getNumThreadsForFinalReduce(), + Math.max(1, 2 * Runtime.getRuntime().availableProcessors())); + _parallelChunkSizeForFinalReduce = queryContext.getParallelChunkSizeForFinalReduce(); } @Override @@ -166,27 +184,31 @@ public void finish(boolean sort, boolean storeFinalResult) { for (int i = 0; i < numAggregationFunctions; i++) { columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType(); } - if (_numThreadsForFinalReduce > 1) { + int numThreadsForFinalReduce = inferNumThreadsForFinalReduce(); + // Submit task when the EXECUTOR_SERVICE is not overloaded + if ((numThreadsForFinalReduce > 1) && (EXECUTOR_SERVICE.getQueue().size() < THREAD_POOL_SIZE * 3)) { // Multi-threaded final reduce + List> futures = new ArrayList<>(); try { List topRecordsList = new ArrayList<>(_topRecords); - int chunkSize = (topRecordsList.size() + _numThreadsForFinalReduce - 1) / _numThreadsForFinalReduce; - List> futures = new ArrayList<>(); - for (int threadId = 0; threadId < _numThreadsForFinalReduce; threadId++) { + int chunkSize = (topRecordsList.size() + numThreadsForFinalReduce - 1) / numThreadsForFinalReduce; + for (int threadId = 0; threadId < numThreadsForFinalReduce; threadId++) { int startIdx = threadId * chunkSize; int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size()); - if (startIdx < endIdx) { // Submit a task for processing a chunk of values - futures.add(EXECUTOR_SERVICE.submit(() -> { - for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) { - Object[] values = topRecordsList.get(recordIdx).getValues(); - for (int i = 0; i < numAggregationFunctions; i++) { - int colId = i + _numKeyColumns; - values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]); + futures.add(EXECUTOR_SERVICE.submit(new TraceCallable() { + @Override + public Void callJob() { + for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) { + Object[] values = topRecordsList.get(recordIdx).getValues(); + for (int i = 0; i < numAggregationFunctions; i++) { + int colId = i + _numKeyColumns; + values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]); + } } + return null; } - return null; })); } } @@ -195,6 +217,10 @@ public void finish(boolean sort, boolean storeFinalResult) { future.get(); } } catch (InterruptedException | ExecutionException e) { + // Cancel all running tasks + for (Future future : futures) { + future.cancel(true); + } throw new RuntimeException("Error during multi-threaded final reduce", e); } } else { @@ -209,6 +235,39 @@ public void finish(boolean sort, boolean storeFinalResult) { } } + private int inferNumThreadsForFinalReduce() { + if (_numThreadsForFinalReduce > 1) { + return _numThreadsForFinalReduce; + } + if (containsExpensiveAggregationFunctions()) { + int parallelChunkSize = _parallelChunkSizeForFinalReduce; + if (_topRecords != null && _topRecords.size() > parallelChunkSize) { + int estimatedThreads = (int) Math.ceil((double) _topRecords.size() / parallelChunkSize); + if (estimatedThreads == 0) { + return 1; + } + return Math.min(estimatedThreads, THREAD_POOL_SIZE); + } + } + // Default to 1 thread + return 1; + } + + private boolean containsExpensiveAggregationFunctions() { + for (AggregationFunction aggregationFunction : _aggregationFunctions) { + switch (aggregationFunction.getType()) { + case FUNNELCOMPLETECOUNT: + case FUNNELCOUNT: + case FUNNELMATCHSTEP: + case FUNNELMAXSTEP: + return true; + default: + break; + } + } + return false; + } + @Override public int size() { return _topRecords != null ? _topRecords.size() : _lookupMap.size(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 96a33b80fab8..987e936e816f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -96,6 +96,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; public static final int DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE = 1; + public static final int DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = 10_000; private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); @@ -277,6 +278,13 @@ private void applyQueryOptions(QueryContext queryContext) { } else { queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE); } + // Set parallelChunkSizeForFinalReduce + Integer parallelChunkSizeForFinalReduce = QueryOptionsUtils.getParallelChunkSizeForFinalReduce(queryOptions); + if (parallelChunkSizeForFinalReduce != null) { + queryContext.setParallelChunkSizeForFinalReduce(parallelChunkSizeForFinalReduce); + } else { + queryContext.setParallelChunkSizeForFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE); + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index a2e1843356db..37209e40f15a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -126,6 +126,8 @@ public class QueryContext { private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD; // Number of threads to use for final reduce private int _numThreadsForFinalReduce = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE; + // Parallel chunk size for final reduce + private int _parallelChunkSizeForFinalReduce = InstancePlanMakerImplV2.DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE; // Whether null handling is enabled private boolean _nullHandlingEnabled; // Whether server returns the final result @@ -421,6 +423,14 @@ public void setNumThreadsForFinalReduce(int numThreadsForFinalReduce) { _numThreadsForFinalReduce = numThreadsForFinalReduce; } + public int getParallelChunkSizeForFinalReduce() { + return _parallelChunkSizeForFinalReduce; + } + + public void setParallelChunkSizeForFinalReduce(int parallelChunkSizeForFinalReduce) { + _parallelChunkSizeForFinalReduce = parallelChunkSizeForFinalReduce; + } + public boolean isNullHandlingEnabled() { return _nullHandlingEnabled; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 8f24496c0eaa..c6071571f6ee 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -435,6 +435,7 @@ public static class QueryOptionKey { /** Max number of groups GroupByDataTableReducer (running at broker) should return. */ public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize"; public static final String NUM_THREADS_FOR_FINAL_REDUCE = "numThreadsForFinalReduce"; + public static final String PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = "parallelChunkSizeForFinalReduce"; public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery"; public static final String USE_FIXED_REPLICA = "useFixedReplica";