Skip to content

Commit

Permalink
Add tests with numThreadsForFinalReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Jan 18, 2025
1 parent 2fba98b commit fcb643d
Show file tree
Hide file tree
Showing 18 changed files with 107 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand Down Expand Up @@ -953,6 +953,6 @@ public File createAvroFile()
));
}
}
return avroFile;
return List.of(avroFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
import java.util.UUID;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -88,7 +89,7 @@ protected long getCountStarResult() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand Down Expand Up @@ -153,8 +154,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}

return avroFile;
return List.of(avroFile);
}

private static String newRandomBase64String() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -165,7 +166,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand All @@ -188,7 +189,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

private byte[] getRandomRawValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,26 @@ public void setUp()
Schema schema = createSchema();
addSchema(schema);

File avroFile = createAvroFile();
List<File> avroFiles = createAvroFiles();
if (isRealtimeTable()) {
// create realtime table
TableConfig tableConfig = createRealtimeTableConfig(avroFile);
TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
addTableConfig(tableConfig);

// Push data into Kafka
pushAvroIntoKafka(List.of(avroFile));
pushAvroIntoKafka(avroFiles);
} else {
// create offline table
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);

// create & upload segments
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);
int segmentIndex = 0;
for (File avroFile : avroFiles) {
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex++, _segmentDir,
_tarDir);
uploadSegments(getTableName(), _tarDir);
}
}

waitForAllDocsLoaded(60_000);
Expand Down Expand Up @@ -247,7 +251,7 @@ public String getKafkaTopic() {
@Override
public abstract Schema createSchema();

public abstract File createAvroFile()
public abstract List<File> createAvroFiles()
throws Exception;

public boolean isRealtimeTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws IOException {

// create avro schema
Expand Down Expand Up @@ -124,7 +124,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
Expand Down Expand Up @@ -130,7 +131,7 @@ protected long getCountStarResult() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand Down Expand Up @@ -185,7 +186,7 @@ public File createAvroFile()
}
}

return avroFile;
return List.of(avroFile);
}

@Test(dataProvider = "useBothQueryEngines")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public TableConfig createOfflineTableConfig() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
List<org.apache.avro.Schema.Field> fields =
Expand Down Expand Up @@ -130,7 +130,7 @@ public File createAvroFile()
}
Collections.sort(_sortedSequenceIds);

return avroFile;
return List.of(avroFile);
}

@Test(dataProvider = "useBothQueryEngines")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public TableConfig createOfflineTableConfig() {
.build();
}

public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
org.apache.avro.Schema stringMapAvroSchema =
Expand Down Expand Up @@ -126,7 +126,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

protected int getSelectionDefaultDocCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public TableConfig createOfflineTableConfig() {
.build();
}

public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
org.apache.avro.Schema stringMapAvroSchema =
Expand Down Expand Up @@ -119,7 +119,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

protected int getSelectionDefaultDocCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public TableConfig createOfflineTableConfig() {
.build();
}

public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
org.apache.avro.Schema stringKeyMapAvroSchema =
Expand Down Expand Up @@ -116,7 +116,7 @@ public File createAvroFile()
}
}

return avroFile;
return List.of(avroFile);
}

protected int getSelectionDefaultDocCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -64,7 +65,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws IOException {

// create avro schema
Expand Down Expand Up @@ -103,7 +104,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected long getCountStarResult() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// Read all skills from the skill file
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt");
Expand Down Expand Up @@ -164,7 +164,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected long getCountStarResult() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws IOException {

// create avro schema
Expand Down Expand Up @@ -171,7 +171,7 @@ public File createAvroFile()
}
}

return avroFile;
return List.of(avroFile);
}

@Test(dataProvider = "useV1QueryEngine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.sql.Timestamp;
import java.util.List;
import java.util.TimeZone;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
Expand Down Expand Up @@ -461,7 +462,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand Down Expand Up @@ -533,6 +534,6 @@ public File createAvroFile()
tsBaseLong += 86400000;
}
}
return avroFile;
return List.of(avroFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -279,7 +280,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand All @@ -303,7 +304,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

private byte[] getRandomRawValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -123,7 +124,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand All @@ -146,7 +147,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

private byte[] getRandomRawValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public Schema createSchema() {
}

@Override
public File createAvroFile()
public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
Expand Down Expand Up @@ -320,7 +320,7 @@ public File createAvroFile()
fileWriter.append(record);
}
}
return avroFile;
return List.of(avroFile);
}

private float[] createZeroVector(int vectorDimSize) {
Expand Down
Loading

0 comments on commit fcb643d

Please sign in to comment.