Skip to content

Commit

Permalink
Mandatory cancellation support, coverage tests
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 1, 2018
1 parent 3741210 commit 6158001
Show file tree
Hide file tree
Showing 29 changed files with 218 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Prototype Java 9 library based on the asynchronous enumerable concept (where mov
### Gradle

```groovy
compile "com.github.akarnokd:async-enumerable:0.5.0"
compile "com.github.akarnokd:async-enumerable:0.6.0"
```

### Getting started
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=0.5.0
version=0.6.0
org.gradle.jvmargs=-XX:+IgnoreUnrecognizedVMOptions --permit-illegal-access --show-version

28 changes: 20 additions & 8 deletions src/main/java/hu/akarnokd/asyncenum/AsyncConcatArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package hu.akarnokd.asyncenum;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;
import java.util.function.BiConsumer;

final class AsyncConcatArray<T> implements AsyncEnumerable<T> {
Expand All @@ -38,33 +38,36 @@ static final class ConcatArrayEnumerator<T> extends AtomicInteger

final AsyncEnumerable<T>[] sources;

AsyncEnumerator<T> currentEnumerator;
final AtomicReference<AsyncEnumerator<T>> currentEnumerator;

CompletableFuture<Boolean> currentStage;

int index;

ConcatArrayEnumerator(AsyncEnumerable<T>[] sources) {
this.sources = sources;
this.currentEnumerator = new AtomicReference<>();
}

@Override
public CompletionStage<Boolean> moveNext() {
if (currentEnumerator == null) {
if (currentEnumerator.get() == null) {
if (index == sources.length) {
return FALSE;
}
currentEnumerator = sources[index++].enumerator();
if (!AsyncEnumeratorHelper.replace(currentEnumerator, sources[index++].enumerator())) {
return CANCELLED;
}
}

currentStage = new CompletableFuture<>();
currentEnumerator.moveNext().whenComplete(this);
currentEnumerator.getPlain().moveNext().whenComplete(this);
return currentStage;
}

@Override
public T current() {
return currentEnumerator.current();
return currentEnumerator.getPlain().current();
}

@Override
Expand All @@ -82,11 +85,20 @@ public void accept(Boolean aBoolean, Throwable throwable) {
currentStage.complete(false);
break;
}
currentEnumerator = sources[index++].enumerator();
currentEnumerator.moveNext().whenComplete(this);
AsyncEnumerator<T> en = sources[index++].enumerator();
if (AsyncEnumeratorHelper.replace(currentEnumerator, en)) {
en.moveNext().whenComplete(this);
} else {
break;
}
} while (decrementAndGet() != 0);
}
}
}

@Override
public void cancel() {
AsyncEnumeratorHelper.cancel(currentEnumerator);
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncEmpty.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public CompletionStage<Boolean> moveNext() {
public Object current() {
return null;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
5 changes: 1 addition & 4 deletions src/main/java/hu/akarnokd/asyncenum/AsyncEnumerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,5 @@ public interface AsyncEnumerator<T> {
* Instructs the AsyncEnumerator to cancel any outstanding async activity and
* release resources associated with it.
*/
// FIXME make mandatory
default void cancel() {

}
void cancel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public Object current() {
return null;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}

@SuppressWarnings("unchecked")
static <T> boolean cancel(AtomicReference<AsyncEnumerator<T>> target) {
AsyncEnumerator<?> current = target.getAndSet((AsyncEnumerator<T>)CANCELLED);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncError.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public CompletionStage<Boolean> moveNext() {
public T current() {
return null;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncFromArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,10 @@ public CompletionStage<Boolean> moveNext() {
public T current() {
return current;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,10 @@ public CompletionStage<Boolean> moveNext() {
public T current() {
return result;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,10 @@ public CompletionStage<Boolean> moveNext() {
public Integer current() {
return current;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,10 @@ public void accept(T t, Throwable throwable) {
completable.complete(true);
}
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncFromIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,10 @@ public CompletionStage<Boolean> moveNext() {
public T current() {
return current;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncJust.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,10 @@ public CompletionStage<Boolean> moveNext() {
public T current() {
return value;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
11 changes: 11 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncMax.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ static final class SumLongEnumerator<T> extends AtomicInteger

CompletableFuture<Boolean> completable;

volatile boolean cancelled;

SumLongEnumerator(AsyncEnumerator<T> source, Comparator<? super T> comparator) {
this.source = source;
this.comparator = comparator;
Expand All @@ -76,6 +78,9 @@ public T current() {
void collectSource() {
if (getAndIncrement() == 0) {
do {
if (cancelled) {
return;
}
source.moveNext().whenComplete(this);
} while (decrementAndGet() != 0);
}
Expand Down Expand Up @@ -110,5 +115,11 @@ public void accept(Boolean aBoolean, Throwable throwable) {
}
}
}

@Override
public void cancel() {
cancelled = true;
source.cancel();
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncNever.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public CompletionStage<Boolean> moveNext() {
public Object current() {
return null;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
4 changes: 4 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public T current() {
return source.current();
}

@Override
public void cancel() {
source.cancel();
}
}

}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,10 @@ public CompletionStage<Boolean> moveNext() {
public Integer current() {
return current;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,10 @@ public CompletionStage<Boolean> moveNext() {
public T current() {
return result;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public CompletionStage<Boolean> moveNext() {
public T current() {
return item;
}

@Override
public void cancel() {
// No action, consumer should stop calling moveNext().
}
}
15 changes: 14 additions & 1 deletion src/main/java/hu/akarnokd/asyncenum/AsyncSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,20 @@ public T current() {

@Override
public void run() {
source.complete(upstream.enumerator());
AsyncEnumerator<T> en = upstream.enumerator();
if (!source.complete(en)) {
en.cancel();
}
}

@Override
public void cancel() {
if (!source.completeExceptionally(new CancellationException())) {
AsyncEnumerator<T> en = source.getNow(null);
if (en != null) {
en.cancel();
}
}
}
}
}
11 changes: 11 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncSumInt.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ static final class SumIntEnumerator<T> extends AtomicInteger

CompletableFuture<Boolean> cf;

volatile boolean cancelled;

SumIntEnumerator(AsyncEnumerator<T> source, Function<? super T, ? extends Number> selector) {
this.source = source;
this.selector = selector;
Expand All @@ -75,6 +77,9 @@ public Integer current() {
void collectSource() {
if (getAndIncrement() == 0) {
do {
if (cancelled) {
return;
}
source.moveNext().whenComplete(this);
} while (decrementAndGet() != 0);
}
Expand Down Expand Up @@ -102,5 +107,11 @@ public void accept(Boolean aBoolean, Throwable throwable) {
}
}
}

@Override
public void cancel() {
cancelled = true;
source.cancel();
}
}
}
11 changes: 11 additions & 0 deletions src/main/java/hu/akarnokd/asyncenum/AsyncSumLong.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ static final class SumLongEnumerator<T> extends AtomicInteger

CompletableFuture<Boolean> cf;

volatile boolean cancelled;

SumLongEnumerator(AsyncEnumerator<T> source, Function<? super T, ? extends Number> selector) {
this.source = source;
this.selector = selector;
Expand All @@ -75,6 +77,9 @@ public Long current() {
void collectSource() {
if (getAndIncrement() == 0) {
do {
if (cancelled) {
return;
}
source.moveNext().whenComplete(this);
} while (decrementAndGet() != 0);
}
Expand Down Expand Up @@ -102,5 +107,11 @@ public void accept(Boolean aBoolean, Throwable throwable) {
}
}
}

@Override
public void cancel() {
cancelled = true;
source.cancel();
}
}
}
13 changes: 13 additions & 0 deletions src/test/java/hu/akarnokd/asyncenum/AsyncConcatArrayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.*;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;

public class AsyncConcatArrayTest {

Expand Down Expand Up @@ -82,4 +83,16 @@ public void concatWith() {
public void emptyArray() {
TestHelper.assertResult(AsyncEnumerable.concatArray());
}

@Test
public void cancelThenMove() {
TestHelper.withExecutor(executor -> {
for (int i = 0; i < 10000; i++) {
AsyncEnumerator<Integer> en = AsyncEnumerable.concatArray(AsyncEnumerable.range(1, 5))
.enumerator();

TestHelper.race(en::cancel, en::moveNext, executor);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public void cancelledCurrentNull() {
public void cancelledMoveNext() throws InterruptedException {
assertSame(AsyncEnumerable.CANCELLED, AsyncEnumeratorHelper.CANCELLED.moveNext());
}

@Test
public void cancelledCancel() {
AsyncEnumeratorHelper.CANCELLED.cancel();
}
}
Loading

0 comments on commit 6158001

Please sign in to comment.