diff --git a/avro-builder/builder-spi/build.gradle b/avro-builder/builder-spi/build.gradle index f38867fc..d66b3306 100644 --- a/avro-builder/builder-spi/build.gradle +++ b/avro-builder/builder-spi/build.gradle @@ -17,7 +17,6 @@ dependencies { implementation "org.apache.logging.log4j:log4j-api:2.17.1" implementation "commons-io:commons-io:2.11.0" implementation "jakarta.json:jakarta.json-api:2.0.1" - implementation "com.pivovarit:parallel-collectors:2.5.0" testImplementation "org.apache.avro:avro:1.9.2" } diff --git a/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java b/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java index 5d2fa523..b4f87bd9 100644 --- a/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java +++ b/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java @@ -6,14 +6,17 @@ package com.linkedin.avroutil1.builder.util; -import com.pivovarit.collectors.ParallelCollectors; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; @@ -37,8 +40,8 @@ private StreamUtil() { /** * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive. - *

- * For the parallelism of 1, the stream is executed by the calling thread. + * + *

For the parallelism of 1, the stream is executed by the calling thread.

* * @param mapper a transformation to be performed in parallel * @param parallelism the max parallelism level @@ -48,6 +51,70 @@ private StreamUtil() { * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel. */ public static Collector> toParallelStream(Function mapper, int parallelism) { - return ParallelCollectors.parallelToStream(mapper, WORK_EXECUTOR, parallelism); + return toParallelStream(mapper, parallelism, 1); + } + + /** + * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} + * and returning a {@link Stream} instance returning results as they arrive. + * + *

For the parallelism of 1 or if the size of the elements is <= batchSize, the stream is executed by the + * calling thread.

+ * + * @param mapper a transformation to be performed in parallel + * @param parallelism the max parallelism level + * @param batchSize the size into which inputs should be batched before running the mapper. + * @param the type of the collected elements + * @param the result returned by {@code mapper} + * + * @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel. + */ + public static Collector> toParallelStream(Function mapper, int parallelism, + int batchSize) { + if (parallelism <= 0 || batchSize <= 0) { + throw new IllegalArgumentException("Parallelism and batch size must be >= 1"); + } + + return Collectors.collectingAndThen(Collectors.toList(), list -> { + if (list.isEmpty()) { + return Stream.empty(); + } + + if (parallelism == 1 || list.size() <= batchSize) { + return list.stream().map(mapper); + } + + final Executor limitingExecutor = new LimitingExecutor(parallelism); + final int batchCount = (list.size() - 1) / batchSize; + return IntStream.rangeClosed(0, batchCount) + .mapToObj(batch -> { + int startIndex = batch * batchSize; + int endIndex = (batch == batchCount) ? list.size() : (batch + 1) * batchSize; + return list.subList(startIndex, endIndex).stream(); + }) + .map(batch -> CompletableFuture.supplyAsync(() -> batch.map(mapper), limitingExecutor)) + .flatMap(CompletableFuture::join); + }); + } + + private static class LimitingExecutor implements Executor { + + private final Semaphore _limiter; + + private LimitingExecutor(int maxParallelism) { + _limiter = new Semaphore(maxParallelism); + } + + @Override + public void execute(Runnable command) { + try { + _limiter.acquire(); + WORK_EXECUTOR.execute(command); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + _limiter.release(); + } + } } } diff --git a/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java b/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java index cd10cab8..e266b27e 100644 --- a/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java +++ b/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java @@ -280,7 +280,7 @@ public static void main(String[] args) throws Exception { } OperationContext opContext = operationContextBuilder.buildOperationContext(opConfig); long operationContextBuildEnd = System.currentTimeMillis(); - LOGGER.info("Built operation context in {} millis.", operationContextBuildStart - operationContextBuildEnd); + LOGGER.info("Built operation context in {} millis.", operationContextBuildEnd - operationContextBuildStart); BuilderPluginContext context = new BuilderPluginContext(opContext); diff --git a/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/operations/codegen/own/AvroUtilCodeGenPlugin.java b/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/operations/codegen/own/AvroUtilCodeGenPlugin.java index fad2cc65..141ed62b 100644 --- a/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/operations/codegen/own/AvroUtilCodeGenPlugin.java +++ b/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/operations/codegen/own/AvroUtilCodeGenPlugin.java @@ -56,6 +56,8 @@ public void createOperations(BuilderPluginContext context) { } private void generateCode(OperationContext opContext) { + LOGGER.info("Generating Avro Java bindings..."); + // Make sure the output folder exists File outputFolder = config.getOutputSpecificRecordClassesRoot(); if (!outputFolder.exists() && !outputFolder.mkdirs()) { @@ -109,7 +111,7 @@ private void generateCode(OperationContext opContext) { } catch (Exception e) { throw new RuntimeException("failed to generate class for " + namedSchema.getFullName(), e); } - }, 10)).collect(Collectors.toList()); + }, 10, 10)).collect(Collectors.toList()); long genEnd = System.currentTimeMillis(); LOGGER.info("Generated {} java source files in {} millis", generatedClasses.size(), genEnd - genStart); @@ -138,7 +140,7 @@ private void writeJavaFilesToDisk(Collection javaFiles, Path outputFol } return 1; - }, 10)).reduce(0, Integer::sum); + }, 10, 10)).reduce(0, Integer::sum); long writeEnd = System.currentTimeMillis(); LOGGER.info("Wrote out {} generated java source files under {} in {} millis", filesWritten, outputFolderPath,