From 51ff79b9a3ea05cd4d09c16a8ecf09c919681474 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yang Date: Wed, 14 Feb 2024 13:58:38 -0500 Subject: [PATCH] fix StreamUtil to execute the map function for each batch in the thread-pool (#548) --- .../linkedin/avroutil1/builder/util/StreamUtil.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java b/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java index 0c6a0387..77d6fb16 100644 --- a/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java +++ b/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java @@ -6,6 +6,7 @@ package com.linkedin.avroutil1.builder.util; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -90,14 +91,16 @@ private StreamUtil() { .mapToObj(batch -> { int startIndex = batch * batchSize; int endIndex = (batch == batchCount) ? list.size() : (batch + 1) * batchSize; - return list.subList(startIndex, endIndex).stream(); + return list.subList(startIndex, endIndex); }) - .map(batch -> CompletableFuture.supplyAsync(() -> batch.map(mapper), limitingExecutor)) - .flatMap(CompletableFuture::join); + .map(batch -> CompletableFuture.supplyAsync(() -> batch.stream().map(mapper).collect(Collectors.toList()), + limitingExecutor)) + .map(CompletableFuture::join) + .flatMap(Collection::stream); }); } - private static class LimitingExecutor implements Executor { + private final static class LimitingExecutor implements Executor { private final Semaphore _limiter;