From e18251b678b7893ef68563bd2fe3e3501edc5adf Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Fri, 17 Jan 2025 19:49:40 -0800 Subject: [PATCH] Add tests with numThreadsForFinalReduce --- .../utils/config/QueryOptionsUtils.java | 15 ++-- .../data/table/ConcurrentIndexedTable.java | 5 +- .../pinot/core/data/table/IndexedTable.java | 46 +++++------- .../core/data/table/SimpleIndexedTable.java | 6 +- .../UnboundedConcurrentIndexedTable.java | 5 +- .../combine/GroupByCombineOperator.java | 3 +- .../plan/maker/InstancePlanMakerImplV2.java | 21 +++--- .../query/reduce/GroupByDataTableReducer.java | 2 +- .../query/request/context/QueryContext.java | 21 +++--- .../apache/pinot/core/util/GroupByUtils.java | 35 +++++---- .../core/data/table/IndexedTableTest.java | 19 +++-- .../integration/tests/custom/ArrayTest.java | 4 +- .../tests/custom/BytesTypeTest.java | 6 +- .../tests/custom/CpcSketchTest.java | 5 +- ...CustomDataQueryClusterIntegrationTest.java | 16 +++-- .../custom/FloatingPointDataTypeTest.java | 4 +- .../tests/custom/GeoSpatialTest.java | 5 +- .../tests/custom/JsonPathTest.java | 4 +- .../custom/MapFieldTypeRealtimeTest.java | 4 +- .../tests/custom/MapFieldTypeTest.java | 4 +- .../integration/tests/custom/MapTypeTest.java | 4 +- .../tests/custom/SumPrecisionTest.java | 5 +- .../tests/custom/TextIndicesTest.java | 4 +- .../tests/custom/ThetaSketchTest.java | 4 +- .../tests/custom/TimestampTest.java | 5 +- .../tests/custom/TupleSketchTest.java | 5 +- .../integration/tests/custom/ULLTest.java | 5 +- .../integration/tests/custom/VectorTest.java | 4 +- .../tests/custom/WindowFunnelTest.java | 65 +++++++++++++++-- .../pinot/perf/BenchmarkCombineGroupBy.java | 2 +- .../pinot/perf/BenchmarkIndexedTable.java | 4 +- .../pinot/spi/utils/CommonConstants.java | 71 ++++++++++++------- 32 files changed, 249 insertions(+), 159 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 674064e513cf..cc4ded534d51 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -249,16 +249,17 @@ public static Integer getGroupTrimThreshold(Map queryOptions) { } @Nullable - public static Integer getNumThreadsForFinalReduce(Map queryOptions) { - String numThreadsForFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE); - return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_FINAL_REDUCE, numThreadsForFinalReduceString, 1); + public static Integer getNumThreadsForServerFinalReduce(Map queryOptions) { + String numThreadsForServerFinalReduceString = queryOptions.get(QueryOptionKey.NUM_THREADS_FOR_SERVER_FINAL_REDUCE); + return checkedParseInt(QueryOptionKey.NUM_THREADS_FOR_SERVER_FINAL_REDUCE, numThreadsForServerFinalReduceString, 1); } @Nullable - public static Integer getParallelChunkSizeForFinalReduce(Map queryOptions) { - String parallelChunkSizeForFinalReduceString = - queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE); - return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE, parallelChunkSizeForFinalReduceString, + public static Integer getParallelChunkSizeForServerFinalReduce(Map queryOptions) { + String parallelChunkSizeForServerFinalReduceString = + queryOptions.get(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_SERVER_FINAL_REDUCE); + return checkedParseInt(QueryOptionKey.PARALLEL_CHUNK_SIZE_FOR_SERVER_FINAL_REDUCE, + parallelChunkSizeForServerFinalReduceString, 1); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java index 871eea7c261f..fd75284324af 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.data.table; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.pinot.common.utils.DataSchema; @@ -33,9 +34,9 @@ public class ConcurrentIndexedTable extends IndexedTable { private final ReentrantReadWriteLock _readWriteLock = new ReentrantReadWriteLock(); public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize, - int trimSize, int trimThreshold, int initialCapacity) { + int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) { super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, - new ConcurrentHashMap<>(initialCapacity)); + new ConcurrentHashMap<>(initialCapacity), executorService); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java index 2d719ef8724a..6038d78f7301 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java @@ -26,12 +26,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -46,18 +43,8 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public abstract class IndexedTable extends BaseTable { private static final int THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 1); - private static final ThreadPoolExecutor EXECUTOR_SERVICE = (ThreadPoolExecutor) Executors.newFixedThreadPool( - THREAD_POOL_SIZE, new ThreadFactory() { - private final AtomicInteger _threadNumber = new AtomicInteger(1); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "IndexedTable-pool-thread-" + _threadNumber.getAndIncrement()); - t.setDaemon(true); - return t; - } - }); + private final ExecutorService _executorService; protected final Map _lookupMap; protected final boolean _hasFinalInput; protected final int _resultSize; @@ -67,8 +54,8 @@ public Thread newThread(Runnable r) { protected final TableResizer _tableResizer; protected final int _trimSize; protected final int _trimThreshold; - protected final int _numThreadsForFinalReduce; - protected final int _parallelChunkSizeForFinalReduce; + protected final int _numThreadsForServerFinalReduce; + protected final int _parallelChunkSizeForServerFinalReduce; protected Collection _topRecords; private int _numResizes; @@ -86,13 +73,14 @@ public Thread newThread(Runnable r) { * @param lookupMap Map from keys to records */ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize, - int trimSize, int trimThreshold, Map lookupMap) { + int trimSize, int trimThreshold, Map lookupMap, ExecutorService executorService) { super(dataSchema); Preconditions.checkArgument(resultSize >= 0, "Result size can't be negative"); Preconditions.checkArgument(trimSize >= 0, "Trim size can't be negative"); Preconditions.checkArgument(trimThreshold >= 0, "Trim threshold can't be negative"); + _executorService = executorService; _lookupMap = lookupMap; _hasFinalInput = hasFinalInput; _resultSize = resultSize; @@ -108,9 +96,9 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex _trimSize = trimSize; _trimThreshold = trimThreshold; // NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default - _numThreadsForFinalReduce = Math.min(queryContext.getNumThreadsForFinalReduce(), + _numThreadsForServerFinalReduce = Math.min(queryContext.getNumThreadsForServerFinalReduce(), Math.max(1, 2 * Runtime.getRuntime().availableProcessors())); - _parallelChunkSizeForFinalReduce = queryContext.getParallelChunkSizeForFinalReduce(); + _parallelChunkSizeForServerFinalReduce = queryContext.getParallelChunkSizeForServerFinalReduce(); } @Override @@ -184,20 +172,20 @@ public void finish(boolean sort, boolean storeFinalResult) { for (int i = 0; i < numAggregationFunctions; i++) { columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType(); } - int numThreadsForFinalReduce = inferNumThreadsForFinalReduce(); + int numThreadsForServerFinalReduce = inferNumThreadsForServerFinalReduce(); // Submit task when the EXECUTOR_SERVICE is not overloaded - if ((numThreadsForFinalReduce > 1) && (EXECUTOR_SERVICE.getQueue().size() < THREAD_POOL_SIZE * 3)) { + if (numThreadsForServerFinalReduce > 1) { // Multi-threaded final reduce List> futures = new ArrayList<>(); try { List topRecordsList = new ArrayList<>(_topRecords); - int chunkSize = (topRecordsList.size() + numThreadsForFinalReduce - 1) / numThreadsForFinalReduce; - for (int threadId = 0; threadId < numThreadsForFinalReduce; threadId++) { + int chunkSize = (topRecordsList.size() + numThreadsForServerFinalReduce - 1) / numThreadsForServerFinalReduce; + for (int threadId = 0; threadId < numThreadsForServerFinalReduce; threadId++) { int startIdx = threadId * chunkSize; int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size()); if (startIdx < endIdx) { // Submit a task for processing a chunk of values - futures.add(EXECUTOR_SERVICE.submit(new TraceCallable() { + futures.add(_executorService.submit(new TraceCallable() { @Override public Void callJob() { for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) { @@ -235,12 +223,12 @@ public Void callJob() { } } - private int inferNumThreadsForFinalReduce() { - if (_numThreadsForFinalReduce > 1) { - return _numThreadsForFinalReduce; + private int inferNumThreadsForServerFinalReduce() { + if (_numThreadsForServerFinalReduce > 1) { + return _numThreadsForServerFinalReduce; } if (containsExpensiveAggregationFunctions()) { - int parallelChunkSize = _parallelChunkSizeForFinalReduce; + int parallelChunkSize = _parallelChunkSizeForServerFinalReduce; if (_topRecords != null && _topRecords.size() > parallelChunkSize) { int estimatedThreads = (int) Math.ceil((double) _topRecords.size() / parallelChunkSize); if (estimatedThreads == 0) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java index df89c3a8e1f7..e05f8dea9ca1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.data.table; import java.util.HashMap; +import java.util.concurrent.ExecutorService; import javax.annotation.concurrent.NotThreadSafe; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.query.request.context.QueryContext; @@ -31,8 +32,9 @@ public class SimpleIndexedTable extends IndexedTable { public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize, - int trimSize, int trimThreshold, int initialCapacity) { - super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity)); + int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) { + super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity), + executorService); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java index f397ac0e8cab..3d87ebc03c93 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.data.table; +import java.util.concurrent.ExecutorService; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.query.request.context.QueryContext; @@ -36,8 +37,8 @@ public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable { public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, - int resultSize, int initialCapacity) { - super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity); + int resultSize, int initialCapacity, ExecutorService executorService) { + super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity, executorService); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index aeef763ad5a1..e1f0baa60201 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -111,7 +111,8 @@ protected void processSegments() { if (_indexedTable == null) { synchronized (this) { if (_indexedTable == null) { - _indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks); + _indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks, + _executorService); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 987e936e816f..c8d69ef66161 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -271,19 +271,20 @@ private void applyQueryOptions(QueryContext queryContext) { } else { queryContext.setGroupTrimThreshold(_groupByTrimThreshold); } - // Set numThreadsForFinalReduce - Integer numThreadsForFinalReduce = QueryOptionsUtils.getNumThreadsForFinalReduce(queryOptions); - if (numThreadsForFinalReduce != null) { - queryContext.setNumThreadsForFinalReduce(numThreadsForFinalReduce); + // Set numThreadsForServerFinalReduce + Integer numThreadsForServerFinalReduce = QueryOptionsUtils.getNumThreadsForServerFinalReduce(queryOptions); + if (numThreadsForServerFinalReduce != null) { + queryContext.setNumThreadsForServerFinalReduce(numThreadsForServerFinalReduce); } else { - queryContext.setNumThreadsForFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE); + queryContext.setNumThreadsForServerFinalReduce(DEFAULT_NUM_THREADS_FOR_FINAL_REDUCE); } - // Set parallelChunkSizeForFinalReduce - Integer parallelChunkSizeForFinalReduce = QueryOptionsUtils.getParallelChunkSizeForFinalReduce(queryOptions); - if (parallelChunkSizeForFinalReduce != null) { - queryContext.setParallelChunkSizeForFinalReduce(parallelChunkSizeForFinalReduce); + // Set parallelChunkSizeForServerFinalReduce + Integer parallelChunkSizeForServerFinalReduce = + QueryOptionsUtils.getParallelChunkSizeForServerFinalReduce(queryOptions); + if (parallelChunkSizeForServerFinalReduce != null) { + queryContext.setParallelChunkSizeForServerFinalReduce(parallelChunkSizeForServerFinalReduce); } else { - queryContext.setParallelChunkSizeForFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE); + queryContext.setParallelChunkSizeForServerFinalReduce(DEFAULT_PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index c53be31ed518..e1db966f1b3b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -240,7 +240,7 @@ private IndexedTable getIndexedTable(DataSchema dataSchema, Collection survivors // Test SimpleIndexedTable IndexedTable indexedTable = - new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); + new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY, + Executors.newCachedThreadPool()); IndexedTable mergeTable = - new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); + new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY, + Executors.newCachedThreadPool()); testNonConcurrent(indexedTable, mergeTable); indexedTable.finish(true); checkSurvivors(indexedTable, survivors); // Test ConcurrentIndexedTable indexedTable = - new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); + new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY, + Executors.newCachedThreadPool()); mergeTable = - new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY); + new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY, + Executors.newCachedThreadPool()); testNonConcurrent(indexedTable, mergeTable); indexedTable.finish(true); checkSurvivors(indexedTable, survivors); @@ -260,11 +265,11 @@ public void testNoMoreNewRecords() { IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, false, queryContext, 5, Integer.MAX_VALUE, Integer.MAX_VALUE, - INITIAL_CAPACITY); + INITIAL_CAPACITY, Executors.newCachedThreadPool()); testNoMoreNewRecordsInTable(indexedTable); indexedTable = new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, Integer.MAX_VALUE, Integer.MAX_VALUE, - INITIAL_CAPACITY); + INITIAL_CAPACITY, Executors.newCachedThreadPool()); testNoMoreNewRecordsInTable(indexedTable); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java index 78fda6266e12..84598ab45bc2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java @@ -873,7 +873,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -953,6 +953,6 @@ public File createAvroFile() )); } } - return avroFile; + return List.of(avroFile); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java index 8e3f18c30dfb..6c798d1e68db 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java @@ -23,6 +23,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.UUID; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -88,7 +89,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -153,8 +154,7 @@ public File createAvroFile() fileWriter.append(record); } } - - return avroFile; + return List.of(avroFile); } private static String newRandomBase64String() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java index 85bcb59e8e49..bf42a3ec5a6e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java @@ -23,6 +23,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -165,7 +166,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -188,7 +189,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private byte[] getRandomRawValue() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java index 6f15e3be17db..2d4569f012c4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java @@ -94,22 +94,26 @@ public void setUp() Schema schema = createSchema(); addSchema(schema); - File avroFile = createAvroFile(); + List avroFiles = createAvroFiles(); if (isRealtimeTable()) { // create realtime table - TableConfig tableConfig = createRealtimeTableConfig(avroFile); + TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0)); addTableConfig(tableConfig); // Push data into Kafka - pushAvroIntoKafka(List.of(avroFile)); + pushAvroIntoKafka(avroFiles); } else { // create offline table TableConfig tableConfig = createOfflineTableConfig(); addTableConfig(tableConfig); // create & upload segments - ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir); - uploadSegments(getTableName(), _tarDir); + int segmentIndex = 0; + for (File avroFile : avroFiles) { + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex++, _segmentDir, + _tarDir); + uploadSegments(getTableName(), _tarDir); + } } waitForAllDocsLoaded(60_000); @@ -247,7 +251,7 @@ public String getKafkaTopic() { @Override public abstract Schema createSchema(); - public abstract File createAvroFile() + public abstract List createAvroFiles() throws Exception; public boolean isRealtimeTable() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java index 6c761270211b..7adc80628c4e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java @@ -72,7 +72,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws IOException { // create avro schema @@ -124,7 +124,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java index b5cf20019bfe..6b11a8da3ade 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.nio.ByteBuffer; +import java.util.List; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -130,7 +131,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -185,7 +186,7 @@ public File createAvroFile() } } - return avroFile; + return List.of(avroFile); } @Test(dataProvider = "useBothQueryEngines") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java index 7dd460d1f576..d865d7defd20 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java @@ -96,7 +96,7 @@ public TableConfig createOfflineTableConfig() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); List fields = @@ -130,7 +130,7 @@ public File createAvroFile() } Collections.sort(_sortedSequenceIds); - return avroFile; + return List.of(avroFile); } @Test(dataProvider = "useBothQueryEngines") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java index a9cee052b1e9..8d54850f23ed 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java @@ -92,7 +92,7 @@ public TableConfig createOfflineTableConfig() { .build(); } - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); org.apache.avro.Schema stringMapAvroSchema = @@ -126,7 +126,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } protected int getSelectionDefaultDocCount() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java index 74ce24e3d784..e906c5b86580 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java @@ -89,7 +89,7 @@ public TableConfig createOfflineTableConfig() { .build(); } - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); org.apache.avro.Schema stringMapAvroSchema = @@ -119,7 +119,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } protected int getSelectionDefaultDocCount() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java index 4c5571c9a10a..578ee5e5ea1f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java @@ -86,7 +86,7 @@ public TableConfig createOfflineTableConfig() { .build(); } - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); org.apache.avro.Schema stringKeyMapAvroSchema = @@ -116,7 +116,7 @@ public File createAvroFile() } } - return avroFile; + return List.of(avroFile); } protected int getSelectionDefaultDocCount() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java index b087913c7ce6..2677211430f7 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -64,7 +65,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws IOException { // create avro schema @@ -103,7 +104,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java index 9d963eab7537..353cd00396d0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java @@ -132,7 +132,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // Read all skills from the skill file InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt"); @@ -164,7 +164,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java index e9b577d977e5..e63e0ad99c0d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java @@ -87,7 +87,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws IOException { // create avro schema @@ -171,7 +171,7 @@ public File createAvroFile() } } - return avroFile; + return List.of(avroFile); } @Test(dataProvider = "useV1QueryEngine") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java index 60e63898f401..483370b24736 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.sql.Timestamp; +import java.util.List; import java.util.TimeZone; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -461,7 +462,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -533,6 +534,6 @@ public File createAvroFile() tsBaseLong += 86400000; } } - return avroFile; + return List.of(avroFile); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java index e4cd62c30272..d39c64db5767 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java @@ -23,6 +23,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -279,7 +280,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -303,7 +304,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private byte[] getRandomRawValue() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java index ccf82c21816d..d017a71db8af 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java @@ -24,6 +24,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -123,7 +124,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -146,7 +147,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private byte[] getRandomRawValue() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java index 78a078d3d960..da2e03c9fe39 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java @@ -252,7 +252,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -320,7 +320,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private float[] createZeroVector(int vectorDimSize) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java index d185837c22da..c47765d72a89 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java @@ -21,6 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -212,7 +215,6 @@ public void testFunnelMaxStepGroupByQueriesWithMode(boolean useMultiStageQueryEn } } - @Test(dataProvider = "useBothQueryEngines") public void testFunnelMaxStepGroupByQueriesWithModeKeepAll(boolean useMultiStageQueryEngine) throws Exception { @@ -476,6 +478,53 @@ public void testFunnelMatchStepGroupByQueriesWithMode(boolean useMultiStageQuery } } + @Test(dataProvider = "useV2QueryEngine", invocationCount = 10, threadPoolSize = 5) + public void testFunnelMatchStepWithMultiThreadsReduce(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + int numThreadsForServerFinalReduce = 2 + new Random().nextInt(10); + LOGGER.info("Running testFunnelMatchStepWithMultiThreadsReduce with numThreadsForServerFinalReduce: {}", + numThreadsForServerFinalReduce); + String query = + String.format("SET numThreadsForServerFinalReduce=" + numThreadsForServerFinalReduce + "; " + + "SELECT " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_increase' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d ", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 4); + break; + case 1: + assertEquals(sumSteps, 2); + break; + case 2: + assertEquals(sumSteps, 3); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + } + @Test(dataProvider = "useBothQueryEngines") public void testFunnelMatchStepGroupByQueriesWithModeSkipLeaf(boolean useMultiStageQueryEngine) throws Exception { @@ -860,7 +909,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -895,10 +944,11 @@ public File createAvroFile() } _countStarResult = totalRows * repeats; // create avro file - File avroFile = new File(_tempDir, "data.avro"); - try (DataFileWriter fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { - fileWriter.create(avroSchema, avroFile); - for (int repeat = 0; repeat < repeats; repeat++) { + List avroFiles = new ArrayList<>(); + for (int repeat = 0; repeat < repeats; repeat++) { + File avroFile = new File(_tempDir, "data" + repeat + ".avro"); + try (DataFileWriter fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + fileWriter.create(avroSchema, avroFile); for (int i = 0; i < userUrlValues.length; i++) { for (int j = 0; j < userUrlValues[i].length; j++) { GenericData.Record record = new GenericData.Record(avroSchema); @@ -909,7 +959,8 @@ public File createAvroFile() } } } + avroFiles.add(avroFile); } - return avroFile; + return avroFiles; } } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java index 579bd5b227f8..a8fd8cf98db0 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java @@ -121,7 +121,7 @@ public void concurrentIndexedTableForCombineGroupBy() IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable(_dataSchema, false, _queryContext, trimSize, trimSize, InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD, - InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY); + InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY, _executorService); List> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS); diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java index 6c9667533b73..7f9be99ea950 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java @@ -119,7 +119,7 @@ public void concurrentIndexedTable() // make 1 concurrent table IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable(_dataSchema, false, _queryContext, TRIM_SIZE, TRIM_SIZE, TRIM_THRESHOLD, - TRIM_THRESHOLD); + TRIM_THRESHOLD, _executorService); // 10 parallel threads putting 10k records into the table @@ -169,7 +169,7 @@ public void simpleIndexedTable() // make 10 indexed tables IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema, false, _queryContext, TRIM_SIZE, TRIM_SIZE, TRIM_THRESHOLD, - TRIM_THRESHOLD); + TRIM_THRESHOLD, _executorService); simpleIndexedTables.add(simpleIndexedTable); // put 10k records in each indexed table, in parallel diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c6071571f6ee..bcbe1d3bba29 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -421,21 +421,42 @@ public static class QueryOptionKey { public static final String USE_SCAN_REORDER_OPTIMIZATION = "useScanReorderOpt"; public static final String MAX_EXECUTION_THREADS = "maxExecutionThreads"; - /** Number of groups AggregateOperator should limit result to after sorting. - * Trimming happens only when (sub)query contains order by and limit clause. */ + /** + * Number of groups AggregateOperator should limit result to after sorting. + * Trimming happens only when (sub)query contains order by and limit clause. + */ public static final String GROUP_TRIM_SIZE = "groupTrimSize"; - /** Number of groups GroupByOperator should limit result to after sorting. - * Trimming happens only when (sub)query contains order by clause. */ + /** + * Number of groups GroupByOperator should limit result to after sorting. + * Trimming happens only when (sub)query contains order by clause. + */ public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "minSegmentGroupTrimSize"; - /** Max number of groups GroupByCombineOperator (running at server) should return .*/ + /** + * Max number of groups GroupByCombineOperator (running at server) should return . + */ public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize"; - /** Max number of groups GroupByDataTableReducer (running at broker) should return. */ + /** + * Max number of groups GroupByDataTableReducer (running at broker) should return. + */ public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize"; - public static final String NUM_THREADS_FOR_FINAL_REDUCE = "numThreadsForFinalReduce"; - public static final String PARALLEL_CHUNK_SIZE_FOR_FINAL_REDUCE = "parallelChunkSizeForFinalReduce"; + + /** + * Number of threads used in the server level final reduce, this is used for expensive aggregation functions, + * and we want to push down reduce to server level instead of streaming all the data back to broker for + * global reduce. + * + * E.g. Funnel queries are considered as expensive aggregation functions. + */ + public static final String NUM_THREADS_FOR_SERVER_FINAL_REDUCE = "numThreadsForServerFinalReduce"; + + /** + * Number of threads used in the final reduce at broker level. + */ + public static final String PARALLEL_CHUNK_SIZE_FOR_SERVER_FINAL_REDUCE = + "parallelChunkSizeForServerFinalReduce"; public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery"; public static final String USE_FIXED_REPLICA = "useFixedReplica"; @@ -472,7 +493,9 @@ public static class QueryOptionKey { public static final String MULTI_STAGE_LEAF_LIMIT = "multiStageLeafLimit"; - /** Throw an exception on reaching num_groups_limit instead of just setting a flag. */ + /** + * Throw an exception on reaching num_groups_limit instead of just setting a flag. + */ public static final String ERROR_ON_NUM_GROUPS_LIMIT = "errorOnNumGroupsLimit"; public static final String NUM_GROUPS_LIMIT = "numGroupsLimit"; public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "maxInitialResultHolderCapacity"; @@ -579,22 +602,22 @@ public static class AdaptiveServerSelector { * * * Stats Collection: Enabling/Disabling stats collection will dictate whether stats (like latency, # of inflight - * requests) will be collected when queries are routed to/received from servers. It does not - * have any impact on the Server Selection Strategy used. + * requests) will be collected when queries are routed to/received from servers. It does not + * have any impact on the Server Selection Strategy used. * * Routing Strategy: Decides what strategy should be used to pick a server. Note that this - * routing strategy complements the existing Balanced/ReplicaGroup/StrictReplicaGroup - * strategies and is not a replacement.The available strategies are as follows: - * 1. NO_OP: Uses the default behavior offered by Balanced/ReplicaGroup/StrictReplicaGroup - * instance selectors. Does NOT require Stats Collection to be enabled. - * 2. NUM_INFLIGHT_REQ: Picks the best server based on the number of inflight requests for - * each server. Requires Stats Collection to be enabled. - * 3. LATENCY: Picks the best server based on the Exponential Weighted Moving Averge of Latency - * for each server. Requires Stats Collection to be enabled. - * 4. HYBRID: Picks the best server by computing a custom hybrid score based on both latency - * and # inflight requests. This is based on the approach described in the paper - * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf. Requires Stats - * Collection to be enabled. + * routing strategy complements the existing Balanced/ReplicaGroup/StrictReplicaGroup + * strategies and is not a replacement.The available strategies are as follows: + * 1. NO_OP: Uses the default behavior offered by Balanced/ReplicaGroup/StrictReplicaGroup + * instance selectors. Does NOT require Stats Collection to be enabled. + * 2. NUM_INFLIGHT_REQ: Picks the best server based on the number of inflight requests for + * each server. Requires Stats Collection to be enabled. + * 3. LATENCY: Picks the best server based on the Exponential Weighted Moving Averge of Latency + * for each server. Requires Stats Collection to be enabled. + * 4. HYBRID: Picks the best server by computing a custom hybrid score based on both latency + * and # inflight requests. This is based on the approach described in the paper + * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf. Requires Stats + * Collection to be enabled. */ public enum Type { @@ -1114,7 +1137,7 @@ public static class Realtime { public enum Status { IN_PROGRESS, // The segment is still consuming data COMMITTING, // This state will only be utilised by pauseless ingestion when the segment has been consumed but - // is yet to be build and uploaded by the server. + // is yet to be build and uploaded by the server. DONE, // The segment has finished consumption and has been committed to the segment store UPLOADED; // The segment is uploaded by an external party