Skip to content

Commit

Permalink
Performance improvements (#527)
Browse files Browse the repository at this point in the history
Co-authored-by: Karthik Ramgopal <kramgopa@linkedin.com>
  • Loading branch information
karthikrg and li-kramgopa authored Jan 9, 2024
1 parent 656164b commit dd5ed71
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 126 deletions.
1 change: 1 addition & 0 deletions avro-builder/builder/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-api:2.17.1"
implementation "commons-io:commons-io:2.11.0"
implementation "org.slf4j:slf4j-api:1.7.14"
implementation "com.squareup:javapoet:1.13.0"
implementation "org.apache.avro:avro-compiler:1.9.2"
implementation "org.apache.avro:avro:1.9.2"
implementation 'org.json:json:20220320'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,20 @@
import com.linkedin.avroutil1.util.ConfigurableAvroSchemaComparator;
import com.linkedin.avroutil1.writer.avsc.AvscSchemaWriter;
import com.linkedin.avroutil1.writer.avsc.AvscWriterConfig;
import com.squareup.javapoet.JavaFile;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import javax.tools.JavaFileObject;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -102,13 +100,15 @@ public void run(OperationContext opContext) throws Exception {
AvroParseContext context = new AvroParseContext();
Set<AvscParseResult> parsedFiles = new HashSet<>();

SchemaSet lookupSchemaSet = null;
final SchemaSet lookupSchemaSet;
if (config.getResolverPath() != null) {
//build a resolver path SchemaSet if ResolverPath is set
lookupSchemaSet = new ResolverPathSchemaSet(config.getResolverPath());
} else if (config.isIncludeClasspath()) {
//build a classpath SchemaSet if classpath (cp) lookup is turned on
lookupSchemaSet = new ClasspathSchemaSet();
} else {
lookupSchemaSet = null;
}
parsedFiles.addAll(parseAvscFiles(avscFiles, true, context));
parsedFiles.addAll(parseAvscFiles(nonImportableFiles, false, context));
Expand Down Expand Up @@ -163,34 +163,20 @@ public void run(OperationContext opContext) throws Exception {

long parseEnd = System.currentTimeMillis();

int schemaChunkSize = 500, schemaCounter = 0, totalSchemaParsed = 0;

List<Map<String, AvroNamedSchema>> allNamedSchemaList = new ArrayList<>();
Map<String, AvroNamedSchema> namedSchemaChunk = new HashMap<>();
for (AvscParseResult parseResult : parsedFiles) {
if(schemaCounter >= schemaChunkSize) {
allNamedSchemaList.add(namedSchemaChunk);
namedSchemaChunk = new HashMap<>();
totalSchemaParsed += schemaCounter;
schemaCounter = 0;
}
final AtomicInteger schemaCounter = new AtomicInteger(0);
final int schemaChunkSize = 500;
Collection<List<AvroNamedSchema>> allNamedSchemaList = parsedFiles.stream().flatMap(parseResult -> {
AvroSchema schema = parseResult.getTopLevelSchema();
if (schema instanceof AvroNamedSchema) {
String name = ((AvroNamedSchema) schema).getFullName();
namedSchemaChunk.put(name, (AvroNamedSchema) parseResult.getTopLevelSchema());
return Stream.of((AvroNamedSchema) parseResult.getTopLevelSchema());
} else if (AvroType.UNION.equals(schema.type())) {
for(SchemaOrRef schemaOrRef : ((AvroUnionSchema) schema).getTypes()) {
String name = ((AvroNamedSchema) schemaOrRef.getSchema()).getFullName();
namedSchemaChunk.put(name, (AvroNamedSchema) schemaOrRef.getSchema());
schemaCounter++;
}
return ((AvroUnionSchema) schema).getTypes()
.stream()
.map(schemaOrRef -> (AvroNamedSchema) schemaOrRef.getSchema());
} else {
return Stream.empty();
}
schemaCounter++;
}
if(!namedSchemaChunk.isEmpty()) {
totalSchemaParsed += schemaCounter;
allNamedSchemaList.add(namedSchemaChunk);
}
}).collect(Collectors.groupingBy(it -> schemaCounter.getAndIncrement() / schemaChunkSize)).values();

// Handle duplicate schemas
Map<String, List<AvscParseResult>> duplicates = context.getDuplicates();
Expand Down Expand Up @@ -240,7 +226,7 @@ public void run(OperationContext opContext) throws Exception {
}
}

LOGGER.info("parsed {} named schemas in {} millis, {} of which have duplicates", totalSchemaParsed,
LOGGER.info("parsed {} named schemas in {} millis, {} of which have duplicates", schemaCounter.get(),
parseEnd - scanEnd, duplicates.size());

//TODO fail if any errors or dups (depending on config) are found
Expand All @@ -250,73 +236,63 @@ public void run(OperationContext opContext) throws Exception {
"unresolved referenced to external schemas: " + context.getExternalReferences());
}


SpecificRecordClassGenerator generator = new SpecificRecordClassGenerator();
HashSet<String> alreadyGeneratedSchemas = new HashSet<>();
List<JavaFileObject> generatedSpecificClasses;
int totalGeneratedClasses = 0;

long genStart = System.currentTimeMillis();
long errorCount = 0;
List<AvroNamedSchema> internalSchemaList;

for (Map<String, AvroNamedSchema> allNamedSchemas : allNamedSchemaList) {
generatedSpecificClasses = new ArrayList<>(totalSchemaParsed);
for (Map.Entry<String, AvroNamedSchema> namedSchemaEntry : allNamedSchemas.entrySet()) {
String fullname = namedSchemaEntry.getKey();
AvroNamedSchema namedSchema = namedSchemaEntry.getValue();
final SpecificRecordGenerationConfig generationConfig =
SpecificRecordGenerationConfig.getBroadCompatibilitySpecificRecordGenerationConfig(
AvroJavaStringRepresentation.fromJson(config.getStringRepresentation().toString()),
AvroJavaStringRepresentation.fromJson(config.getMethodStringRepresentation().toString()),
config.getMinAvroVersion(), config.isUtf8EncodingPutByIndexEnabled());

try {
// Make sure the output folder exists
File outputFolder = config.getOutputSpecificRecordClassesRoot();
if (!outputFolder.exists() && !outputFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}
final Path outputDirectoryPath = outputFolder.toPath();

if (!alreadyGeneratedSchemas.contains(namedSchema.getFullName())) {
int totalGeneratedClasses = allNamedSchemaList.parallelStream().map(allNamedSchemas -> {
SpecificRecordClassGenerator generator = new SpecificRecordClassGenerator();
HashSet<String> alreadyGeneratedSchemaNames = new HashSet<>();
List<JavaFile> generatedSpecificClasses = new ArrayList<>(allNamedSchemas.size());
for (AvroNamedSchema namedSchema : allNamedSchemas) {
try {
if (!alreadyGeneratedSchemaNames.contains(namedSchema.getFullName())) {
// skip codegen if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() && doesSchemaExistOnClasspath(namedSchema, lookupSchemaSet)) {
continue;
}

//top level schema
alreadyGeneratedSchemas.add(namedSchema.getFullName());
generatedSpecificClasses.add(generator.generateSpecificClass(namedSchema,
SpecificRecordGenerationConfig.getBroadCompatibilitySpecificRecordGenerationConfig(
AvroJavaStringRepresentation.fromJson(config.getStringRepresentation().toString()),
AvroJavaStringRepresentation.fromJson(config.getMethodStringRepresentation().toString()),
config.getMinAvroVersion(), config.isUtf8EncodingPutByIndexEnabled())));
alreadyGeneratedSchemaNames.add(namedSchema.getFullName());
generatedSpecificClasses.add(generator.generateSpecificClass(namedSchema, generationConfig));

// generate internal schemas if not already present
internalSchemaList = SpecificRecordGeneratorUtil.getNestedInternalSchemaList(namedSchema);
List<AvroNamedSchema> internalSchemaList =
SpecificRecordGeneratorUtil.getNestedInternalSchemaList(namedSchema);
for (AvroNamedSchema namedInternalSchema : internalSchemaList) {
if (!alreadyGeneratedSchemas.contains(namedInternalSchema.getFullName())) {
if (!alreadyGeneratedSchemaNames.contains(namedInternalSchema.getFullName())) {
// skip codegen for nested schemas if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() && doesSchemaExistOnClasspath(namedInternalSchema,
lookupSchemaSet)) {
continue;
}

generatedSpecificClasses.add(generator.generateSpecificClass(namedInternalSchema,
SpecificRecordGenerationConfig.getBroadCompatibilitySpecificRecordGenerationConfig(
AvroJavaStringRepresentation.fromJson(config.getStringRepresentation().toString()),
AvroJavaStringRepresentation.fromJson(config.getMethodStringRepresentation().toString()),
config.getMinAvroVersion(), config.isUtf8EncodingPutByIndexEnabled())));
alreadyGeneratedSchemas.add(namedInternalSchema.getFullName());
generatedSpecificClasses.add(generator.generateSpecificClass(namedInternalSchema, generationConfig));
alreadyGeneratedSchemaNames.add(namedInternalSchema.getFullName());
}
}
}
} catch (Exception e) {
throw new RuntimeException("failed to generate class for " + fullname, e);
throw new RuntimeException("failed to generate class for " + namedSchema.getFullName(), e);
}
}
writeJavaFilesToDisk(generatedSpecificClasses, config.getOutputSpecificRecordClassesRoot());
totalGeneratedClasses += generatedSpecificClasses.size();
}
writeJavaFilesToDisk(generatedSpecificClasses, outputDirectoryPath);
return generatedSpecificClasses.size();
}).reduce(0, Integer::sum);

long genEnd = System.currentTimeMillis();

if (errorCount > 0) {
LOGGER.info("failed to generate {} java source files ({} generated successfully) in {} millis", errorCount,
totalGeneratedClasses, genEnd - genStart);
} else {
LOGGER.info("generated {} java source files in {} millis", totalGeneratedClasses, genEnd - genStart);
}
LOGGER.info("generated {} java source files in {} millis", totalGeneratedClasses, genEnd - genStart);

Set<File> allAvroFiles = new HashSet<>(avscFiles);
allAvroFiles.addAll(nonImportableFiles);
Expand Down Expand Up @@ -360,40 +336,23 @@ private boolean doesSchemaExistOnClasspath(AvroNamedSchema schema, SchemaSet sch
return schemaSet.getByName(schema.getFullName()) != null;
}

private void writeJavaFilesToDisk(Collection<JavaFileObject> javaClassFiles, File outputFolder) {
private void writeJavaFilesToDisk(Collection<JavaFile> javaFiles, Path outputFolderPath) {

long writeStart = System.currentTimeMillis();

//make sure the output folder exists
if (!outputFolder.exists() && !outputFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}

//write out the files we generated
for (JavaFileObject javaClass : javaClassFiles) {
File outputFile = new File(outputFolder, javaClass.getName());

//TODO - handle case where file already exists and make behaviour configurable (overwite, ignore, ignore_if_identical, etc)

File parentFolder = outputFile.getParentFile();
if (!parentFolder.exists() && !parentFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}

try (
FileOutputStream fos = new FileOutputStream(outputFile, false);
OutputStreamWriter writer = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
Reader reader = javaClass.openReader(true)
) {
IOUtils.copy(reader, writer);
writer.flush();
fos.flush();
// write out the files we generated
long numFilesWritten = javaFiles.parallelStream().map(javaFile -> {
try {
javaFile.writeToPath(outputFolderPath);
} catch (Exception e) {
throw new IllegalStateException("while writing file " + outputFile, e);
throw new IllegalStateException("while writing file " + javaFile.typeSpec.name, e);
}
}

return null;
}).count();

long writeEnd = System.currentTimeMillis();
LOGGER.info("wrote out {} generated java source files under {} in {} millis", javaClassFiles.size(), outputFolder,
LOGGER.info("wrote out {} generated java source files under {} in {} millis", numFilesWritten, outputFolderPath,
writeEnd - writeStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@
import java.util.Map;
import java.util.Queue;
import java.util.StringJoiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.stream.Collectors;
import javax.lang.model.element.Modifier;
import javax.tools.JavaFileObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -78,7 +77,7 @@ public class SpecificRecordClassGenerator {
* @return Java class of top level schema
* @throws ClassNotFoundException
*/
public JavaFileObject generateSpecificClass(AvroNamedSchema topLevelSchema,
public JavaFile generateSpecificClass(AvroNamedSchema topLevelSchema,
SpecificRecordGenerationConfig config) throws ClassNotFoundException {
if (topLevelSchema == null) {
throw new IllegalArgumentException("topLevelSchema required");
Expand Down Expand Up @@ -129,7 +128,7 @@ public JavaFileObject generateSpecificClass(AvroNamedSchema topLevelSchema,
*
*/
private void populateJavaFilesOfInnerNamedSchemasFromRecord(AvroRecordSchema recordSchema,
SpecificRecordGenerationConfig config, List<JavaFileObject> namedSchemaFiles) throws ClassNotFoundException {
SpecificRecordGenerationConfig config, List<JavaFile> namedSchemaFiles) throws ClassNotFoundException {

HashSet<String> visitedSchemasFullNames = new HashSet<>();
Queue<AvroSchema> schemaQueue = recordSchema.getFields()
Expand Down Expand Up @@ -183,7 +182,7 @@ private void populateJavaFilesOfInnerNamedSchemasFromRecord(AvroRecordSchema rec
}


protected JavaFileObject generateSpecificEnum(AvroEnumSchema enumSchema, SpecificRecordGenerationConfig config) {
protected JavaFile generateSpecificEnum(AvroEnumSchema enumSchema, SpecificRecordGenerationConfig config) {
//public enum
TypeSpec.Builder classBuilder = TypeSpec.enumBuilder(enumSchema.getSimpleName());
classBuilder.addModifiers(Modifier.PUBLIC);
Expand Down Expand Up @@ -219,16 +218,15 @@ protected JavaFileObject generateSpecificEnum(AvroEnumSchema enumSchema, Specifi

//create file object
TypeSpec classSpec = classBuilder.build();
JavaFile javaFile = JavaFile.builder(enumSchema.getNamespace(), classSpec)

return JavaFile.builder(enumSchema.getNamespace(), classSpec)
.skipJavaLangImports(false) //no imports
.addFileComment(SpecificRecordGeneratorUtil.AVRO_GEN_COMMENT)
.build();

return javaFile.toJavaFileObject();
}


protected JavaFileObject generateSpecificFixed(AvroFixedSchema fixedSchema, SpecificRecordGenerationConfig config)
protected JavaFile generateSpecificFixed(AvroFixedSchema fixedSchema, SpecificRecordGenerationConfig config)
throws ClassNotFoundException {
//public class
TypeSpec.Builder classBuilder = TypeSpec.classBuilder(fixedSchema.getSimpleName());
Expand Down Expand Up @@ -270,12 +268,11 @@ protected JavaFileObject generateSpecificFixed(AvroFixedSchema fixedSchema, Spec

//create file object
TypeSpec classSpec = classBuilder.build();
JavaFile javaFile = JavaFile.builder(fixedSchema.getNamespace(), classSpec)

return JavaFile.builder(fixedSchema.getNamespace(), classSpec)
.skipJavaLangImports(false) //no imports
.addFileComment(SpecificRecordGeneratorUtil.AVRO_GEN_COMMENT)
.build();

return javaFile.toJavaFileObject();
}

/***
Expand Down Expand Up @@ -336,7 +333,7 @@ private void addCommonClassComponents(SpecificRecordGenerationConfig config, Typ
classBuilder.addMethod(writeExternalBuilder.build());
}

protected JavaFileObject generateSpecificRecord(AvroRecordSchema recordSchema, SpecificRecordGenerationConfig config)
protected JavaFile generateSpecificRecord(AvroRecordSchema recordSchema, SpecificRecordGenerationConfig config)
throws ClassNotFoundException {

// Default to broad compatibility config if null
Expand Down Expand Up @@ -454,12 +451,12 @@ protected JavaFileObject generateSpecificRecord(AvroRecordSchema recordSchema, S
// add no arg constructor
classBuilder.addMethod(MethodSpec.constructorBuilder().addModifiers(Modifier.PUBLIC).build());

if (recordSchema.getFields().size() > 0) {
if (!recordSchema.getFields().isEmpty()) {
// add all arg constructor if #args < 254
addAllArgsConstructor(recordSchema, config.getDefaultFieldStringRepresentation(),
config.getDefaultMethodStringRepresentation(), classBuilder);

if(SpecificRecordGeneratorUtil.recordHasSimpleStringField(recordSchema)) {
if (SpecificRecordGeneratorUtil.recordHasSimpleStringField(recordSchema)) {
addAllArgsConstructor(recordSchema, config.getDefaultFieldStringRepresentation(),
config.getDefaultMethodStringRepresentation().equals(AvroJavaStringRepresentation.STRING)
? AvroJavaStringRepresentation.CHAR_SEQUENCE : AvroJavaStringRepresentation.STRING,
Expand Down Expand Up @@ -505,7 +502,7 @@ protected JavaFileObject generateSpecificRecord(AvroRecordSchema recordSchema, S
.build());

//customCoders
if(hasCustomCoders(recordSchema)) {
if (hasCustomCoders(recordSchema)) {

// customEncode
MethodSpec.Builder customEncodeBuilder = MethodSpec
Expand Down Expand Up @@ -542,12 +539,9 @@ protected JavaFileObject generateSpecificRecord(AvroRecordSchema recordSchema, S

//create file object
TypeSpec classSpec = classBuilder.build();
JavaFile javaFile = JavaFile.builder(recordSchema.getNamespace(), classSpec)
return JavaFile.builder(recordSchema.getNamespace(), classSpec)
.skipJavaLangImports(false) //no imports
.addFileComment(SpecificRecordGeneratorUtil.AVRO_GEN_COMMENT)
.build();

return javaFile.toJavaFileObject();
.addFileComment(SpecificRecordGeneratorUtil.AVRO_GEN_COMMENT).build();
}

private void addAllArgsConstructor(AvroRecordSchema recordSchema,
Expand Down
Loading

0 comments on commit dd5ed71

Please sign in to comment.