diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java index 78fda6266e12..84598ab45bc2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java @@ -873,7 +873,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -953,6 +953,6 @@ public File createAvroFile() )); } } - return avroFile; + return List.of(avroFile); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java index 8e3f18c30dfb..6c798d1e68db 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java @@ -23,6 +23,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.UUID; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -88,7 +89,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -153,8 +154,7 @@ public File createAvroFile() fileWriter.append(record); } } - - return avroFile; + return List.of(avroFile); } private static String newRandomBase64String() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java index 85bcb59e8e49..bf42a3ec5a6e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java @@ -23,6 +23,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -165,7 +166,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -188,7 +189,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private byte[] getRandomRawValue() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java index 6f15e3be17db..2d4569f012c4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java @@ -94,22 +94,26 @@ public void setUp() Schema schema = createSchema(); addSchema(schema); - File avroFile = createAvroFile(); + List avroFiles = createAvroFiles(); if (isRealtimeTable()) { // create realtime table - TableConfig tableConfig = createRealtimeTableConfig(avroFile); + TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0)); addTableConfig(tableConfig); // Push data into Kafka - pushAvroIntoKafka(List.of(avroFile)); + pushAvroIntoKafka(avroFiles); } else { // create offline table TableConfig tableConfig = createOfflineTableConfig(); addTableConfig(tableConfig); // create & upload segments - ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir); - uploadSegments(getTableName(), _tarDir); + int segmentIndex = 0; + for (File avroFile : avroFiles) { + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex++, _segmentDir, + _tarDir); + uploadSegments(getTableName(), _tarDir); + } } waitForAllDocsLoaded(60_000); @@ -247,7 +251,7 @@ public String getKafkaTopic() { @Override public abstract Schema createSchema(); - public abstract File createAvroFile() + public abstract List createAvroFiles() throws Exception; public boolean isRealtimeTable() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java index 6c761270211b..7adc80628c4e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java @@ -72,7 +72,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws IOException { // create avro schema @@ -124,7 +124,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java index b5cf20019bfe..6b11a8da3ade 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.nio.ByteBuffer; +import java.util.List; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -130,7 +131,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -185,7 +186,7 @@ public File createAvroFile() } } - return avroFile; + return List.of(avroFile); } @Test(dataProvider = "useBothQueryEngines") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java index 7dd460d1f576..d865d7defd20 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java @@ -96,7 +96,7 @@ public TableConfig createOfflineTableConfig() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); List fields = @@ -130,7 +130,7 @@ public File createAvroFile() } Collections.sort(_sortedSequenceIds); - return avroFile; + return List.of(avroFile); } @Test(dataProvider = "useBothQueryEngines") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java index a9cee052b1e9..8d54850f23ed 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java @@ -92,7 +92,7 @@ public TableConfig createOfflineTableConfig() { .build(); } - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); org.apache.avro.Schema stringMapAvroSchema = @@ -126,7 +126,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } protected int getSelectionDefaultDocCount() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java index 74ce24e3d784..e906c5b86580 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java @@ -89,7 +89,7 @@ public TableConfig createOfflineTableConfig() { .build(); } - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); org.apache.avro.Schema stringMapAvroSchema = @@ -119,7 +119,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } protected int getSelectionDefaultDocCount() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java index 4c5571c9a10a..578ee5e5ea1f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java @@ -86,7 +86,7 @@ public TableConfig createOfflineTableConfig() { .build(); } - public File createAvroFile() + public List createAvroFiles() throws Exception { org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); org.apache.avro.Schema stringKeyMapAvroSchema = @@ -116,7 +116,7 @@ public File createAvroFile() } } - return avroFile; + return List.of(avroFile); } protected int getSelectionDefaultDocCount() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java index b087913c7ce6..2677211430f7 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -64,7 +65,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws IOException { // create avro schema @@ -103,7 +104,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java index 9d963eab7537..353cd00396d0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java @@ -132,7 +132,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // Read all skills from the skill file InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt"); @@ -164,7 +164,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java index e9b577d977e5..e63e0ad99c0d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java @@ -87,7 +87,7 @@ protected long getCountStarResult() { } @Override - public File createAvroFile() + public List createAvroFiles() throws IOException { // create avro schema @@ -171,7 +171,7 @@ public File createAvroFile() } } - return avroFile; + return List.of(avroFile); } @Test(dataProvider = "useV1QueryEngine") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java index 60e63898f401..483370b24736 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.sql.Timestamp; +import java.util.List; import java.util.TimeZone; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -461,7 +462,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -533,6 +534,6 @@ public File createAvroFile() tsBaseLong += 86400000; } } - return avroFile; + return List.of(avroFile); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java index e4cd62c30272..d39c64db5767 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java @@ -23,6 +23,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -279,7 +280,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -303,7 +304,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private byte[] getRandomRawValue() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java index ccf82c21816d..d017a71db8af 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java @@ -24,6 +24,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.List; import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -123,7 +124,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -146,7 +147,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private byte[] getRandomRawValue() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java index 78a078d3d960..da2e03c9fe39 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java @@ -252,7 +252,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -320,7 +320,7 @@ public File createAvroFile() fileWriter.append(record); } } - return avroFile; + return List.of(avroFile); } private float[] createZeroVector(int vectorDimSize) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java index d185837c22da..b0495a893162 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java @@ -21,6 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -212,7 +215,6 @@ public void testFunnelMaxStepGroupByQueriesWithMode(boolean useMultiStageQueryEn } } - @Test(dataProvider = "useBothQueryEngines") public void testFunnelMaxStepGroupByQueriesWithModeKeepAll(boolean useMultiStageQueryEngine) throws Exception { @@ -476,6 +478,53 @@ public void testFunnelMatchStepGroupByQueriesWithMode(boolean useMultiStageQuery } } + @Test(dataProvider = "useV2QueryEngine", invocationCount = 10, threadPoolSize = 5) + public void testFunnelMatchStepWithMultiThreadsReduce(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + int numThreadsForFinalReduce = 2 + new Random().nextInt(10); + LOGGER.info("Running testFunnelMatchStepWithMultiThreadsReduce with numThreadsForFinalReduce: {}", + numThreadsForFinalReduce); + String query = + String.format("SET numThreadsForFinalReduce=" + numThreadsForFinalReduce + "; " + + "SELECT " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_increase' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d ", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 4); + break; + case 1: + assertEquals(sumSteps, 2); + break; + case 2: + assertEquals(sumSteps, 3); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + } + @Test(dataProvider = "useBothQueryEngines") public void testFunnelMatchStepGroupByQueriesWithModeSkipLeaf(boolean useMultiStageQueryEngine) throws Exception { @@ -860,7 +909,7 @@ public Schema createSchema() { } @Override - public File createAvroFile() + public List createAvroFiles() throws Exception { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); @@ -895,10 +944,11 @@ public File createAvroFile() } _countStarResult = totalRows * repeats; // create avro file - File avroFile = new File(_tempDir, "data.avro"); - try (DataFileWriter fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { - fileWriter.create(avroSchema, avroFile); - for (int repeat = 0; repeat < repeats; repeat++) { + List avroFiles = new ArrayList<>(); + for (int repeat = 0; repeat < repeats; repeat++) { + File avroFile = new File(_tempDir, "data" + repeat + ".avro"); + try (DataFileWriter fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + fileWriter.create(avroSchema, avroFile); for (int i = 0; i < userUrlValues.length; i++) { for (int j = 0; j < userUrlValues[i].length; j++) { GenericData.Record record = new GenericData.Record(avroSchema); @@ -909,7 +959,8 @@ public File createAvroFile() } } } + avroFiles.add(avroFile); } - return avroFile; + return avroFiles; } }