Skip to content

Commit

Permalink
Add handy Collectors for executing jobs on Virtual Threads but with l…
Browse files Browse the repository at this point in the history
…imited parallelism (#879)
  • Loading branch information
pivovarit authored May 5, 2024
1 parent 894384c commit a2c1bc2
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<url>https://github.com/pivovarit/parallel-collectors</url>

<packaging>jar</packaging>
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
Expand Down Expand Up @@ -98,6 +99,13 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(), Function.identity());
}

static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, int parallelism) {
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), Function.identity());
}

static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");
Expand All @@ -115,6 +123,16 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(), s -> s.collect(collector));
}

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, int parallelism) {
requireNonNull(collector, "collector can't be null");
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return parallelism == 1
? asyncCollector(mapper, Executors.newVirtualThreadPerTaskExecutor(), s -> s.collect(collector))
: new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), s -> s.collect(collector));
}

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) {
requireNonNull(collector, "collector can't be null");
requireNonNull(executor, "executor can't be null");
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/pivovarit/collectors/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ private Dispatcher(Executor executor, int permits) {
this.limiter = new Semaphore(permits);
}

private Dispatcher(int permits) {
this.executor = defaultExecutorService();
this.limiter = new Semaphore(permits);
}

static <T> Dispatcher<T> from(Executor executor, int permits) {
return new Dispatcher<>(executor, permits);
}
Expand All @@ -50,6 +55,10 @@ static <T> Dispatcher<T> virtual() {
return new Dispatcher<>();
}

static <T> Dispatcher<T> virtual(int permits) {
return new Dispatcher<>(permits);
}

void start() {
if (!started.getAndSet(true)) {
Thread.ofVirtual().start(() -> {
Expand Down
107 changes: 106 additions & 1 deletion src/main/java/com/pivovarit/collectors/ParallelCollectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public final class ParallelCollectors {
private ParallelCollectors() {
}


/**
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
* and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
Expand All @@ -43,6 +42,32 @@ private ParallelCollectors() {
return AsyncParallelCollector.collectingWithCollector(collector, mapper);
}

/**
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
* and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
*
* <br>
* Example:
* <pre>{@code
* CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
* .collect(parallel(i -> foo(i), toList(), executor, 2));
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param collector the {@code Collector} describing the reduction
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
* @param <RR> the reduction result {@code collector}
* @param parallelism the max parallelism level
*
* @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
*
* @since 3.2.0
*/
public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, int parallelism) {
return AsyncParallelCollector.collectingWithCollector(collector, mapper, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
Expand Down Expand Up @@ -96,6 +121,32 @@ private ParallelCollectors() {
return AsyncParallelCollector.collectingToStream(mapper);
}

/**
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
* and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
*
* <br><br>
* The collector maintains the order of processed {@link Stream}. Instances should not be reused.
*
* <br>
* Example:
* <pre>{@code
* CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
* .collect(parallel(i -> foo()));
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
*
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
*
* @since 3.2.0
*/
public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, int parallelism) {
return AsyncParallelCollector.collectingToStream(mapper, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
Expand Down Expand Up @@ -150,6 +201,33 @@ private ParallelCollectors() {
return ParallelStreamCollector.streaming(mapper);
}

/**
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
* and returning a {@link Stream} instance returning results as they arrive.
* <p>
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
* Stream.of(1, 2, 3)
* .collect(parallelToStream(i -> foo(), executor, 2))
* .forEach(System.out::println);
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param parallelism the max parallelism level
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
*
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
*
* @since 3.2.0
*/
public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, int parallelism) {
return ParallelStreamCollector.streaming(mapper, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive.
Expand Down Expand Up @@ -204,6 +282,33 @@ private ParallelCollectors() {
return ParallelStreamCollector.streamingOrdered(mapper);
}

/**
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
* <p>
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
* Stream.of(1, 2, 3)
* .collect(parallelToOrderedStream(i -> foo(), executor, 2))
* .forEach(System.out::println);
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param parallelism the max parallelism level
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
*
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
*
* @since 3.2.0
*/
public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, int parallelism) {
return ParallelStreamCollector.streamingOrdered(mapper, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
Expand Down
85 changes: 48 additions & 37 deletions src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFutu
private final Dispatcher<R> dispatcher;

private ParallelStreamCollector(
Function<T, R> function,
CompletionStrategy<R> completionStrategy,
Set<Characteristics> characteristics,
Dispatcher<R> dispatcher) {
Function<T, R> function,
CompletionStrategy<R> completionStrategy,
Set<Characteristics> characteristics,
Dispatcher<R> dispatcher) {
this.completionStrategy = completionStrategy;
this.characteristics = characteristics;
this.dispatcher = dispatcher;
Expand All @@ -67,7 +67,7 @@ public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
public BinaryOperator<List<CompletableFuture<R>>> combiner() {
return (left, right) -> {
throw new UnsupportedOperationException(
"Using parallel stream with parallel collectors is a bad idea");
"Using parallel stream with parallel collectors is a bad idea");
};
}

Expand All @@ -90,6 +90,13 @@ public Set<Characteristics> characteristics() {
return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual());
}

static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, int parallelism) {
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual(parallelism));
}

static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");
Expand All @@ -104,8 +111,15 @@ public Set<Characteristics> characteristics() {
return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual());
}

static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, int parallelism) {
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual(parallelism));
}

static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
int parallelism) {
int parallelism) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);
Expand All @@ -119,60 +133,57 @@ private BatchingCollectors() {
}

static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor,
int parallelism) {
int parallelism) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return parallelism == 1
? syncCollector(mapper)
: batchingCollector(mapper, executor, parallelism);
? syncCollector(mapper)
: batchingCollector(mapper, executor, parallelism);
}

static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
int parallelism) {
int parallelism) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return parallelism == 1
? syncCollector(mapper)
: batchingCollector(mapper, executor, parallelism);
? syncCollector(mapper)
: batchingCollector(mapper, executor, parallelism);
}

private static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<T, R> mapper,
Executor executor, int parallelism) {
Executor executor, int parallelism) {
return collectingAndThen(
toList(),
list -> {
// no sense to repack into batches of size 1
if (list.size() == parallelism) {
return list.stream()
.collect(new ParallelStreamCollector<>(
mapper,
ordered(),
emptySet(),
Dispatcher.from(executor, parallelism)));
}
else {
return partitioned(list, parallelism)
.collect(collectingAndThen(new ParallelStreamCollector<>(
batching(mapper),
ordered(),
emptySet(),
Dispatcher.from(executor, parallelism)),
s -> s.flatMap(Collection::stream)));
}
});
toList(),
list -> {
// no sense to repack into batches of size 1
if (list.size() == parallelism) {
return list.stream()
.collect(new ParallelStreamCollector<>(
mapper,
ordered(),
emptySet(),
Dispatcher.from(executor, parallelism)));
} else {
return partitioned(list, parallelism)
.collect(collectingAndThen(new ParallelStreamCollector<>(
batching(mapper),
ordered(),
emptySet(),
Dispatcher.from(executor, parallelism)),
s -> s.flatMap(Collection::stream)));
}
});
}

private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {
return Collector.of(Stream::builder, (rs, t) -> rs.add(mapper.apply(t)), (rs, rs2) -> {
throw new UnsupportedOperationException(
"Using parallel stream with parallel collectors is a bad idea");
"Using parallel stream with parallel collectors is a bad idea");
}, Stream.Builder::build);
}

}

}
4 changes: 4 additions & 0 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ Stream<DynamicTest> collectors() {
return of(
// virtual threads
virtualThreadsTests((m, e, p) -> parallel(m, toList()), "ParallelCollectors.parallel(toList()) [virtual]", true),
virtualThreadsTests((m, e, p) -> parallel(m, toList(), p), "ParallelCollectors.parallel(toList()) [virtual]", true),
virtualThreadsTests((m, e, p) -> parallel(m, toSet()), "ParallelCollectors.parallel(toSet()) [virtual]", false),
virtualThreadsTests((m, e, p) -> parallel(m, toSet(), p), "ParallelCollectors.parallel(toSet()) [virtual]", false),
virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new)), "ParallelCollectors.parallel(toCollection()) [virtual]", true),
virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new), p), "ParallelCollectors.parallel(toCollection()) [virtual]", true),
virtualThreadsTests((m, e, p) -> adapt(parallel(m)), "ParallelCollectors.parallel() [virtual]", true),
virtualThreadsTests((m, e, p) -> adapt(parallel(m, p)), "ParallelCollectors.parallel() [virtual]", true),
// platform threads
tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true),
tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false),
Expand Down
Loading

0 comments on commit a2c1bc2

Please sign in to comment.