From 6158001ebd819c5a49443679590f4d47ed4b5942 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 1 Jan 2018 14:18:32 +0100 Subject: [PATCH] Mandatory cancellation support, coverage tests --- README.md | 2 +- gradle.properties | 2 +- .../akarnokd/asyncenum/AsyncConcatArray.java | 28 +++++++++++++------ .../hu/akarnokd/asyncenum/AsyncEmpty.java | 5 ++++ .../akarnokd/asyncenum/AsyncEnumerator.java | 5 +--- .../asyncenum/AsyncEnumeratorHelper.java | 5 ++++ .../hu/akarnokd/asyncenum/AsyncError.java | 5 ++++ .../hu/akarnokd/asyncenum/AsyncFromArray.java | 5 ++++ .../akarnokd/asyncenum/AsyncFromCallable.java | 5 ++++ .../asyncenum/AsyncFromCharSequence.java | 5 ++++ .../asyncenum/AsyncFromCompletionStage.java | 5 ++++ .../akarnokd/asyncenum/AsyncFromIterable.java | 5 ++++ .../java/hu/akarnokd/asyncenum/AsyncJust.java | 5 ++++ .../java/hu/akarnokd/asyncenum/AsyncMax.java | 11 ++++++++ .../hu/akarnokd/asyncenum/AsyncNever.java | 5 ++++ .../hu/akarnokd/asyncenum/AsyncObserveOn.java | 4 +++ .../hu/akarnokd/asyncenum/AsyncRange.java | 5 ++++ .../asyncenum/AsyncRepeatCallable.java | 5 ++++ .../akarnokd/asyncenum/AsyncRepeatItem.java | 5 ++++ .../akarnokd/asyncenum/AsyncSubscribeOn.java | 15 +++++++++- .../hu/akarnokd/asyncenum/AsyncSumInt.java | 11 ++++++++ .../hu/akarnokd/asyncenum/AsyncSumLong.java | 11 ++++++++ .../asyncenum/AsyncConcatArrayTest.java | 13 +++++++++ .../asyncenum/AsyncEnumeratorHelperTest.java | 5 ++++ .../asyncenum/AsyncFromArrayTest.java | 9 ++++++ .../asyncenum/AsyncFromCallableTest.java | 7 +++++ .../asyncenum/AsyncFromIterableTest.java | 10 +++++++ .../asyncenum/AsyncMathOperatorsTest.java | 15 ++++++++++ .../asyncenum/AsyncSubscribeOnTest.java | 20 +++++++++++++ 29 files changed, 218 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 183324f..0ba9e83 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Prototype Java 9 library based on the asynchronous enumerable concept (where mov ### Gradle ```groovy -compile "com.github.akarnokd:async-enumerable:0.5.0" +compile "com.github.akarnokd:async-enumerable:0.6.0" ``` ### Getting started diff --git a/gradle.properties b/gradle.properties index c2e274a..e923728 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.5.0 +version=0.6.0 org.gradle.jvmargs=-XX:+IgnoreUnrecognizedVMOptions --permit-illegal-access --show-version diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncConcatArray.java b/src/main/java/hu/akarnokd/asyncenum/AsyncConcatArray.java index 4487382..51ca693 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncConcatArray.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncConcatArray.java @@ -17,7 +17,7 @@ package hu.akarnokd.asyncenum; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import java.util.function.BiConsumer; final class AsyncConcatArray implements AsyncEnumerable { @@ -38,7 +38,7 @@ static final class ConcatArrayEnumerator extends AtomicInteger final AsyncEnumerable[] sources; - AsyncEnumerator currentEnumerator; + final AtomicReference> currentEnumerator; CompletableFuture currentStage; @@ -46,25 +46,28 @@ static final class ConcatArrayEnumerator extends AtomicInteger ConcatArrayEnumerator(AsyncEnumerable[] sources) { this.sources = sources; + this.currentEnumerator = new AtomicReference<>(); } @Override public CompletionStage moveNext() { - if (currentEnumerator == null) { + if (currentEnumerator.get() == null) { if (index == sources.length) { return FALSE; } - currentEnumerator = sources[index++].enumerator(); + if (!AsyncEnumeratorHelper.replace(currentEnumerator, sources[index++].enumerator())) { + return CANCELLED; + } } currentStage = new CompletableFuture<>(); - currentEnumerator.moveNext().whenComplete(this); + currentEnumerator.getPlain().moveNext().whenComplete(this); return currentStage; } @Override public T current() { - return currentEnumerator.current(); + return currentEnumerator.getPlain().current(); } @Override @@ -82,11 +85,20 @@ public void accept(Boolean aBoolean, Throwable throwable) { currentStage.complete(false); break; } - currentEnumerator = sources[index++].enumerator(); - currentEnumerator.moveNext().whenComplete(this); + AsyncEnumerator en = sources[index++].enumerator(); + if (AsyncEnumeratorHelper.replace(currentEnumerator, en)) { + en.moveNext().whenComplete(this); + } else { + break; + } } while (decrementAndGet() != 0); } } } + + @Override + public void cancel() { + AsyncEnumeratorHelper.cancel(currentEnumerator); + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncEmpty.java b/src/main/java/hu/akarnokd/asyncenum/AsyncEmpty.java index fcd7e97..d1e707d 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncEmpty.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncEmpty.java @@ -40,4 +40,9 @@ public CompletionStage moveNext() { public Object current() { return null; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerator.java b/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerator.java index 64c0cd7..06acc80 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerator.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerator.java @@ -51,8 +51,5 @@ public interface AsyncEnumerator { * Instructs the AsyncEnumerator to cancel any outstanding async activity and * release resources associated with it. */ - // FIXME make mandatory - default void cancel() { - - } + void cancel(); } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelper.java b/src/main/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelper.java index ac1f615..87b7322 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelper.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelper.java @@ -33,6 +33,11 @@ public Object current() { return null; } + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } + @SuppressWarnings("unchecked") static boolean cancel(AtomicReference> target) { AsyncEnumerator current = target.getAndSet((AsyncEnumerator)CANCELLED); diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncError.java b/src/main/java/hu/akarnokd/asyncenum/AsyncError.java index 27150ed..8e65ee8 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncError.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncError.java @@ -40,4 +40,9 @@ public CompletionStage moveNext() { public T current() { return null; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncFromArray.java b/src/main/java/hu/akarnokd/asyncenum/AsyncFromArray.java index 1969caf..60880ef 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncFromArray.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncFromArray.java @@ -59,5 +59,10 @@ public CompletionStage moveNext() { public T current() { return current; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java index 5c7b653..df3e5fc 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java @@ -62,5 +62,10 @@ public CompletionStage moveNext() { public T current() { return result; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncFromCharSequence.java b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCharSequence.java index 6f5d15f..37e0df5 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncFromCharSequence.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCharSequence.java @@ -59,5 +59,10 @@ public CompletionStage moveNext() { public Integer current() { return current; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncFromCompletionStage.java b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCompletionStage.java index cddfe1e..b211ebd 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncFromCompletionStage.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCompletionStage.java @@ -74,5 +74,10 @@ public void accept(T t, Throwable throwable) { completable.complete(true); } } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncFromIterable.java b/src/main/java/hu/akarnokd/asyncenum/AsyncFromIterable.java index 8c1d9b9..faa365b 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncFromIterable.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncFromIterable.java @@ -56,5 +56,10 @@ public CompletionStage moveNext() { public T current() { return current; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncJust.java b/src/main/java/hu/akarnokd/asyncenum/AsyncJust.java index fa3cf0f..ed6fb0c 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncJust.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncJust.java @@ -54,5 +54,10 @@ public CompletionStage moveNext() { public T current() { return value; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncMax.java b/src/main/java/hu/akarnokd/asyncenum/AsyncMax.java index 166c6f8..c02cc0a 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncMax.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncMax.java @@ -52,6 +52,8 @@ static final class SumLongEnumerator extends AtomicInteger CompletableFuture completable; + volatile boolean cancelled; + SumLongEnumerator(AsyncEnumerator source, Comparator comparator) { this.source = source; this.comparator = comparator; @@ -76,6 +78,9 @@ public T current() { void collectSource() { if (getAndIncrement() == 0) { do { + if (cancelled) { + return; + } source.moveNext().whenComplete(this); } while (decrementAndGet() != 0); } @@ -110,5 +115,11 @@ public void accept(Boolean aBoolean, Throwable throwable) { } } } + + @Override + public void cancel() { + cancelled = true; + source.cancel(); + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncNever.java b/src/main/java/hu/akarnokd/asyncenum/AsyncNever.java index 3f64072..4e9b58e 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncNever.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncNever.java @@ -41,4 +41,9 @@ public CompletionStage moveNext() { public Object current() { return null; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncObserveOn.java b/src/main/java/hu/akarnokd/asyncenum/AsyncObserveOn.java index 961b347..fa6b638 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncObserveOn.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncObserveOn.java @@ -57,6 +57,10 @@ public T current() { return source.current(); } + @Override + public void cancel() { + source.cancel(); + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRange.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRange.java index a6237e4..0d93033 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncRange.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRange.java @@ -64,5 +64,10 @@ public CompletionStage moveNext() { public Integer current() { return current; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java index a9f5fdb..fd753bb 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java @@ -56,5 +56,10 @@ public CompletionStage moveNext() { public T current() { return result; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java index e42053c..5d4df5b 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java @@ -40,4 +40,9 @@ public CompletionStage moveNext() { public T current() { return item; } + + @Override + public void cancel() { + // No action, consumer should stop calling moveNext(). + } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncSubscribeOn.java b/src/main/java/hu/akarnokd/asyncenum/AsyncSubscribeOn.java index f006656..3f1db6e 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncSubscribeOn.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncSubscribeOn.java @@ -65,7 +65,20 @@ public T current() { @Override public void run() { - source.complete(upstream.enumerator()); + AsyncEnumerator en = upstream.enumerator(); + if (!source.complete(en)) { + en.cancel(); + } + } + + @Override + public void cancel() { + if (!source.completeExceptionally(new CancellationException())) { + AsyncEnumerator en = source.getNow(null); + if (en != null) { + en.cancel(); + } + } } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncSumInt.java b/src/main/java/hu/akarnokd/asyncenum/AsyncSumInt.java index 76ff22a..5c25eca 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncSumInt.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncSumInt.java @@ -51,6 +51,8 @@ static final class SumIntEnumerator extends AtomicInteger CompletableFuture cf; + volatile boolean cancelled; + SumIntEnumerator(AsyncEnumerator source, Function selector) { this.source = source; this.selector = selector; @@ -75,6 +77,9 @@ public Integer current() { void collectSource() { if (getAndIncrement() == 0) { do { + if (cancelled) { + return; + } source.moveNext().whenComplete(this); } while (decrementAndGet() != 0); } @@ -102,5 +107,11 @@ public void accept(Boolean aBoolean, Throwable throwable) { } } } + + @Override + public void cancel() { + cancelled = true; + source.cancel(); + } } } diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncSumLong.java b/src/main/java/hu/akarnokd/asyncenum/AsyncSumLong.java index 42d6481..92b33cc 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncSumLong.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncSumLong.java @@ -51,6 +51,8 @@ static final class SumLongEnumerator extends AtomicInteger CompletableFuture cf; + volatile boolean cancelled; + SumLongEnumerator(AsyncEnumerator source, Function selector) { this.source = source; this.selector = selector; @@ -75,6 +77,9 @@ public Long current() { void collectSource() { if (getAndIncrement() == 0) { do { + if (cancelled) { + return; + } source.moveNext().whenComplete(this); } while (decrementAndGet() != 0); } @@ -102,5 +107,11 @@ public void accept(Boolean aBoolean, Throwable throwable) { } } } + + @Override + public void cancel() { + cancelled = true; + source.cancel(); + } } } diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncConcatArrayTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncConcatArrayTest.java index 42b4591..9d66479 100644 --- a/src/test/java/hu/akarnokd/asyncenum/AsyncConcatArrayTest.java +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncConcatArrayTest.java @@ -21,6 +21,7 @@ import java.util.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; public class AsyncConcatArrayTest { @@ -82,4 +83,16 @@ public void concatWith() { public void emptyArray() { TestHelper.assertResult(AsyncEnumerable.concatArray()); } + + @Test + public void cancelThenMove() { + TestHelper.withExecutor(executor -> { + for (int i = 0; i < 10000; i++) { + AsyncEnumerator en = AsyncEnumerable.concatArray(AsyncEnumerable.range(1, 5)) + .enumerator(); + + TestHelper.race(en::cancel, en::moveNext, executor); + } + }); + } } diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelperTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelperTest.java index 2f00598..7cc493b 100644 --- a/src/test/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelperTest.java +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelperTest.java @@ -77,4 +77,9 @@ public void cancelledCurrentNull() { public void cancelledMoveNext() throws InterruptedException { assertSame(AsyncEnumerable.CANCELLED, AsyncEnumeratorHelper.CANCELLED.moveNext()); } + + @Test + public void cancelledCancel() { + AsyncEnumeratorHelper.CANCELLED.cancel(); + } } diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncFromArrayTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncFromArrayTest.java index 631ac35..68850fa 100644 --- a/src/test/java/hu/akarnokd/asyncenum/AsyncFromArrayTest.java +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncFromArrayTest.java @@ -34,4 +34,13 @@ public void simple() throws Exception { assertEquals(Arrays.asList(1, 2, 3, 4, 5), list); } + + @Test + public void take() { + TestHelper.assertResult( + AsyncEnumerable.fromArray(1, 2, 3, 4, 5) + .take(3), + 1, 2, 3 + ); + } } diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java index dbbf67f..cd9063c 100644 --- a/src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java @@ -36,4 +36,11 @@ public void error() { IOException.class ); } + + @Test + public void cancel() { + AsyncEnumerable.fromCallable(() -> 1) + .enumerator() + .cancel(); + } } diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncFromIterableTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncFromIterableTest.java index d16e550..7ead640 100644 --- a/src/test/java/hu/akarnokd/asyncenum/AsyncFromIterableTest.java +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncFromIterableTest.java @@ -34,4 +34,14 @@ public void simple() throws Exception { assertEquals(Arrays.asList(1, 2, 3, 4, 5), list); } + + + @Test + public void take() { + TestHelper.assertResult( + AsyncEnumerable.fromIterable(Arrays.asList(1, 2, 3, 4, 5)) + .take(3), + 1, 2, 3 + ); + } } diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncMathOperatorsTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncMathOperatorsTest.java index 988ed54..57a5c5f 100644 --- a/src/test/java/hu/akarnokd/asyncenum/AsyncMathOperatorsTest.java +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncMathOperatorsTest.java @@ -119,4 +119,19 @@ public void sumLongError() { RuntimeException.class, "forced failure" ); } + + @Test + public void maxCancelRace() { + TestHelper.cancelRace(ae -> ae.max(Comparator.naturalOrder())); + } + + @Test + public void sumIntCancelRace() { + TestHelper.cancelRace(ae -> ae.sumInt(v -> v)); + } + + @Test + public void sumLongCancelRace() { + TestHelper.cancelRace(ae -> ae.sumLong(v -> v)); + } } diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncSubscribeOnTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncSubscribeOnTest.java index a1c0e9d..2984efd 100644 --- a/src/test/java/hu/akarnokd/asyncenum/AsyncSubscribeOnTest.java +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncSubscribeOnTest.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -44,4 +45,23 @@ public void simple() { exec.shutdownNow(); } } + + @Test + public void cancel() { + TestHelper.withScheduler(executor ->{ + AtomicReference> f = new AtomicReference<>(); + AsyncEnumerable.range(1, 5) + .subscribeOn(r -> { + f.set(executor.schedule(r, 100, TimeUnit.MILLISECONDS)); + }) + .enumerator() + .cancel(); + + try { + f.get().get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + } }