From 94de3904ac346ba3c9f9bd8522a3629cfb0b2ef7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 28 Dec 2017 00:47:52 +0100 Subject: [PATCH] More operators, prepare release 0.3.0 --- README.md | 2 +- gradle.properties | 2 +- .../akarnokd/asyncenum/AsyncDoOnCancel.java | 64 +++++++++ .../akarnokd/asyncenum/AsyncEnumerable.java | 60 +++++++++ .../akarnokd/asyncenum/AsyncFromCallable.java | 66 ++++++++++ .../asyncenum/AsyncIgnoreElements.java | 87 ++++++++++++ .../hu/akarnokd/asyncenum/AsyncRepeat.java | 114 ++++++++++++++++ .../asyncenum/AsyncRepeatCallable.java | 60 +++++++++ .../akarnokd/asyncenum/AsyncRepeatItem.java | 43 ++++++ .../akarnokd/asyncenum/AsyncRepeatWhen.java | 124 ++++++++++++++++++ .../hu/akarnokd/asyncenum/AsyncRetry.java | 114 ++++++++++++++++ .../hu/akarnokd/asyncenum/AsyncRetryWhen.java | 124 ++++++++++++++++++ .../asyncenum/AsyncDoOnCancelTest.java | 57 ++++++++ .../asyncenum/AsyncFromCallableTest.java | 39 ++++++ .../asyncenum/AsyncIgnoreElementsTest.java | 63 +++++++++ .../asyncenum/AsyncRepeatCallableTest.java | 40 ++++++ .../asyncenum/AsyncRepeatItemTest.java | 31 +++++ .../akarnokd/asyncenum/AsyncRepeatTest.java | 61 +++++++++ .../asyncenum/AsyncRepeatWhenTest.java | 65 +++++++++ .../hu/akarnokd/asyncenum/AsyncRetryTest.java | 89 +++++++++++++ .../asyncenum/AsyncRetryWhenTest.java | 64 +++++++++ 21 files changed, 1367 insertions(+), 2 deletions(-) create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncDoOnCancel.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncIgnoreElements.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncRepeat.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncRepeatWhen.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncRetry.java create mode 100644 src/main/java/hu/akarnokd/asyncenum/AsyncRetryWhen.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncDoOnCancelTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncIgnoreElementsTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncRepeatCallableTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncRepeatItemTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncRepeatTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncRepeatWhenTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncRetryTest.java create mode 100644 src/test/java/hu/akarnokd/asyncenum/AsyncRetryWhenTest.java diff --git a/README.md b/README.md index afffab0..af9e786 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Prototype Java 9 library based on the asynchronous enumerable concept (where mov ### Gradle ```groovy -compile "com.github.akarnokd:async-enumerable:0.2.0" +compile "com.github.akarnokd:async-enumerable:0.3.0" ``` ### Getting started diff --git a/gradle.properties b/gradle.properties index 8c7546f..9198ae7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.2.0 +version=0.3.0 org.gradle.jvmargs=-XX:+IgnoreUnrecognizedVMOptions --permit-illegal-access --show-version diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncDoOnCancel.java b/src/main/java/hu/akarnokd/asyncenum/AsyncDoOnCancel.java new file mode 100644 index 0000000..f49f010 --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncDoOnCancel.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.CompletionStage; + +final class AsyncDoOnCancel implements AsyncEnumerable { + + final AsyncEnumerable source; + + final Runnable onCancel; + + AsyncDoOnCancel(AsyncEnumerable source, Runnable onCancel) { + this.source = source; + this.onCancel = onCancel; + } + + @Override + public AsyncEnumerator enumerator() { + return new DoOnCancelEnumerator<>(source.enumerator(), onCancel); + } + + static final class DoOnCancelEnumerator implements AsyncEnumerator { + + final AsyncEnumerator source; + + final Runnable onCancel; + + DoOnCancelEnumerator(AsyncEnumerator source, Runnable onCancel) { + this.source = source; + this.onCancel = onCancel; + } + + @Override + public CompletionStage moveNext() { + return source.moveNext(); + } + + @Override + public T current() { + return source.current(); + } + + @Override + public void cancel() { + onCancel.run(); + source.cancel(); + } + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java b/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java index 75a6d20..addf432 100644 --- a/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java @@ -120,6 +120,18 @@ static AsyncEnumerable interval(long initialDelay, long period, TimeUnit u return new AsyncInterval(initialDelay, period, unit, executor); } + static AsyncEnumerable fromCallable(Callable callable) { + return new AsyncFromCallable<>(callable); + } + + static AsyncEnumerable repeatItem(T item) { + return new AsyncRepeatItem<>(item); + } + + static AsyncEnumerable repeatCallable(Callable callable) { + return new AsyncRepeatCallable<>(callable); + } + // ------------------------------------------------------------------------------------- // Instance transformations @@ -238,6 +250,54 @@ default AsyncEnumerable doFinally(Runnable onFinally) { return new AsyncDoFinally<>(this, onFinally); } + default AsyncEnumerable ignoreElements() { + return new AsyncIgnoreElements<>(this); + } + + default AsyncEnumerable doOnCancel(Runnable onCancel) { + return new AsyncDoOnCancel<>(this, onCancel); + } + + default AsyncEnumerable repeat(long times) { + return repeat(times, () -> false); + } + + default AsyncEnumerable repeat(BooleanSupplier stop) { + return repeat(Long.MAX_VALUE, stop); + } + + default AsyncEnumerable repeat(long times, BooleanSupplier stop) { + return new AsyncRepeat<>(this, times, stop); + } + + default AsyncEnumerable retry(long times) { + return retry(times, e -> true); + } + + default AsyncEnumerable retry(Predicate predicate) { + return retry(Long.MAX_VALUE, predicate); + } + + default AsyncEnumerable retry(long times, Predicate predicate) { + return new AsyncRetry<>(this, times, predicate); + } + + default AsyncEnumerable repeatWhen(Supplier> completer) { + return repeatWhen(() -> null, s -> completer.get()); + } + + default AsyncEnumerable repeatWhen(Supplier stateSupplier, Function> completer) { + return new AsyncRepeatWhen<>(this, stateSupplier, completer); + } + + default AsyncEnumerable retryWhen(Function> completer) { + return retryWhen(() -> null, (s, e) -> completer.apply(e)); + } + + default AsyncEnumerable retryWhen(Supplier stateSupplier, BiFunction> completer) { + return new AsyncRetryWhen(this, stateSupplier, completer); + } + // ------------------------------------------------------------------------------------- // Instance consumers diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java new file mode 100644 index 0000000..5c7b653 --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncFromCallable.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.*; + +final class AsyncFromCallable implements AsyncEnumerable { + + final Callable callable; + + AsyncFromCallable(Callable callable) { + this.callable = callable; + } + + @Override + public AsyncEnumerator enumerator() { + return new FromCallableEnumerator<>(callable); + } + + static final class FromCallableEnumerator implements AsyncEnumerator { + + final Callable callable; + + T result; + + boolean once; + + FromCallableEnumerator(Callable callable) { + this.callable = callable; + } + + @Override + public CompletionStage moveNext() { + if (once) { + result = null; + return FALSE; + } + once = true; + try { + result = callable.call(); + } catch (Exception ex) { + return CompletableFuture.failedStage(ex); + } + return TRUE; + } + + @Override + public T current() { + return result; + } + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncIgnoreElements.java b/src/main/java/hu/akarnokd/asyncenum/AsyncIgnoreElements.java new file mode 100644 index 0000000..9f4c1ed --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncIgnoreElements.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +final class AsyncIgnoreElements implements AsyncEnumerable { + + final AsyncEnumerable source; + + AsyncIgnoreElements(AsyncEnumerable source) { + this.source = source; + } + + @Override + public AsyncEnumerator enumerator() { + return new IgnoreElementsEnumerator<>(source.enumerator()); + } + + static final class IgnoreElementsEnumerator + extends AtomicInteger + implements AsyncEnumerator, BiConsumer { + + final AsyncEnumerator source; + + CompletableFuture completable; + + IgnoreElementsEnumerator(AsyncEnumerator source) { + this.source = source; + } + + @Override + public CompletionStage moveNext() { + CompletableFuture cf = new CompletableFuture<>(); + completable = cf; + nextSource(); + return cf; + } + + void nextSource() { + if (getAndIncrement() == 0) { + do { + source.moveNext().whenComplete(this); + } while (decrementAndGet() != 0); + } + } + + @Override + public T current() { + return null; // elements are ignored + } + + @Override + public void cancel() { + source.cancel(); + } + + @Override + public void accept(Boolean aBoolean, Throwable throwable) { + if (throwable != null) { + completable.completeExceptionally(throwable); + return; + } + if (aBoolean) { + nextSource(); + } else { + completable.complete(false); + } + } + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeat.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeat.java new file mode 100644 index 0000000..cb31ddb --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeat.java @@ -0,0 +1,114 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.*; + +final class AsyncRepeat implements AsyncEnumerable { + + final AsyncEnumerable source; + + final long times; + + final BooleanSupplier stop; + + AsyncRepeat(AsyncEnumerable source, long times, BooleanSupplier stop) { + this.source = source; + this.times = times; + this.stop = stop; + } + + @Override + public AsyncEnumerator enumerator() { + return new RepeatEnumerator<>(source, times, stop); + } + + static final class RepeatEnumerator + extends AtomicInteger + implements AsyncEnumerator, BiConsumer { + + final AsyncEnumerable source; + + long times; + + final BooleanSupplier stop; + + final AtomicReference> current; + + T result; + + CompletableFuture completable; + + RepeatEnumerator(AsyncEnumerable source, long times, BooleanSupplier stop) { + this.source = source; + this.current = new AtomicReference<>(source.enumerator()); + this.times = times; + this.stop = stop; + } + + @Override + public CompletionStage moveNext() { + CompletableFuture cf = new CompletableFuture<>(); + completable = cf; + nextItem(); + return cf; + } + + @Override + public T current() { + return result; + } + + @Override + public void cancel() { + AsyncEnumeratorHelper.cancel(current); + } + + void nextItem() { + if (getAndIncrement() == 0) { + do { + current.get().moveNext().whenComplete(this); + } while (decrementAndGet() != 0); + } + } + + @Override + public void accept(Boolean aBoolean, Throwable throwable) { + if (throwable != null) { + result = null; + completable.completeExceptionally(throwable); + return; + } + + if (aBoolean) { + result = current.getPlain().current(); + completable.complete(true); + } else { + if (--times <= 0L || stop.getAsBoolean()) { + result = null; + completable.complete(false); + } else { + if (AsyncEnumeratorHelper.replace(current, source.enumerator())) { + nextItem(); + } + } + } + } + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java new file mode 100644 index 0000000..a9f5fdb --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatCallable.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.*; + +final class AsyncRepeatCallable implements AsyncEnumerable { + + final Callable callable; + + AsyncRepeatCallable(Callable callable) { + this.callable = callable; + } + + @Override + public AsyncEnumerator enumerator() { + return new RepeatCallableEnumerator<>(callable); + } + + static final class RepeatCallableEnumerator implements AsyncEnumerator { + + final Callable callable; + + T result; + + RepeatCallableEnumerator(Callable callable) { + this.callable = callable; + } + + @Override + public CompletionStage moveNext() { + result = null; + try { + result = callable.call(); + } catch (Exception ex) { + return CompletableFuture.failedStage(ex); + } + return TRUE; + } + + @Override + public T current() { + return result; + } + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java new file mode 100644 index 0000000..e42053c --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatItem.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.CompletionStage; + +final class AsyncRepeatItem implements AsyncEnumerable, AsyncEnumerator { + + final T item; + + AsyncRepeatItem(T item) { + this.item = item; + } + + @Override + public AsyncEnumerator enumerator() { + return this; + } + + @Override + public CompletionStage moveNext() { + return TRUE; + } + + @Override + public T current() { + return item; + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatWhen.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatWhen.java new file mode 100644 index 0000000..14b5007 --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRepeatWhen.java @@ -0,0 +1,124 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.*; + +public class AsyncRepeatWhen implements AsyncEnumerable { + + final AsyncEnumerable source; + + final Supplier stateSupplier; + + final Function> completer; + + public AsyncRepeatWhen(AsyncEnumerable source, Supplier stateSupplier, Function> completer) { + this.source = source; + this.stateSupplier = stateSupplier; + this.completer = completer; + } + + @Override + public AsyncEnumerator enumerator() { + return new RepeatWhenEnumerator<>(source, stateSupplier.get(), completer); + } + + static final class RepeatWhenEnumerator + extends AtomicInteger + implements AsyncEnumerator, BiConsumer { + + final AsyncEnumerable source; + + final S state; + + final Function> completer; + + final AtomicReference> current; + + T result; + + CompletableFuture completable; + + RepeatWhenEnumerator(AsyncEnumerable source, S state, Function> completer) { + this.source = source; + this.state = state; + this.completer = completer; + this.current = new AtomicReference<>(source.enumerator()); + } + + @Override + public CompletionStage moveNext() { + CompletableFuture cf = new CompletableFuture<>(); + completable = cf; + nextItem(); + return cf; + } + + @Override + public T current() { + return result; + } + + void nextItem() { + if (getAndIncrement() == 0) { + do { + current.get().moveNext().whenComplete(this); + } while (decrementAndGet() != 0); + } + } + + @Override + public void cancel() { + AsyncEnumeratorHelper.cancel(current); + } + + @Override + public void accept(Boolean aBoolean, Throwable throwable) { + if (throwable != null) { + result = null; + completable.completeExceptionally(throwable); + return; + } + + if (aBoolean) { + result = current.getPlain().current(); + completable.complete(true); + } else { + result = null; + completer.apply(state).whenComplete(this::acceptCompleter); + } + } + + void acceptCompleter(Boolean shouldRepeat, Throwable throwable) { + if (throwable != null) { + result = null; + completable.completeExceptionally(throwable); + return; + } + + if (shouldRepeat) { + if (AsyncEnumeratorHelper.replace(current, source.enumerator())) { + nextItem(); + } + } else { + completable.complete(false); + } + } + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRetry.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRetry.java new file mode 100644 index 0000000..ae8dcde --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRetry.java @@ -0,0 +1,114 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.*; + +final class AsyncRetry implements AsyncEnumerable { + + final AsyncEnumerable source; + + final long times; + + final Predicate predicate; + + AsyncRetry(AsyncEnumerable source, long times, Predicate predicate) { + this.source = source; + this.times = times; + this.predicate = predicate; + } + + @Override + public AsyncEnumerator enumerator() { + return new RetryEnumerator<>(source, times, predicate); + } + + static final class RetryEnumerator + extends AtomicInteger + implements AsyncEnumerator, BiConsumer { + + final AsyncEnumerable source; + + long times; + + final Predicate predicate; + + final AtomicReference> current; + + T result; + + CompletableFuture completable; + + RetryEnumerator(AsyncEnumerable source, long times, Predicate predicate) { + this.source = source; + this.current = new AtomicReference<>(source.enumerator()); + this.times = times; + this.predicate = predicate; + } + + @Override + public CompletionStage moveNext() { + CompletableFuture cf = new CompletableFuture<>(); + completable = cf; + nextItem(); + return cf; + } + + @Override + public T current() { + return result; + } + + @Override + public void cancel() { + AsyncEnumeratorHelper.cancel(current); + } + + void nextItem() { + if (getAndIncrement() == 0) { + do { + current.get().moveNext().whenComplete(this); + } while (decrementAndGet() != 0); + } + } + + @Override + public void accept(Boolean aBoolean, Throwable throwable) { + if (throwable != null) { + if (times-- <= 0L || !predicate.test(throwable)) { + result = null; + completable.completeExceptionally(throwable); + } else { + if (AsyncEnumeratorHelper.replace(current, source.enumerator())) { + nextItem(); + } + } + return; + } + + if (aBoolean) { + result = current.getPlain().current(); + completable.complete(true); + } else { + result = null; + completable.complete(false); + } + } + } +} diff --git a/src/main/java/hu/akarnokd/asyncenum/AsyncRetryWhen.java b/src/main/java/hu/akarnokd/asyncenum/AsyncRetryWhen.java new file mode 100644 index 0000000..c29cea3 --- /dev/null +++ b/src/main/java/hu/akarnokd/asyncenum/AsyncRetryWhen.java @@ -0,0 +1,124 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.*; + +public class AsyncRetryWhen implements AsyncEnumerable { + + final AsyncEnumerable source; + + final Supplier stateSupplier; + + final BiFunction> completer; + + public AsyncRetryWhen(AsyncEnumerable source, Supplier stateSupplier, BiFunction> completer) { + this.source = source; + this.stateSupplier = stateSupplier; + this.completer = completer; + } + + @Override + public AsyncEnumerator enumerator() { + return new RetryWhenEnumerator<>(source, stateSupplier.get(), completer); + } + + static final class RetryWhenEnumerator + extends AtomicInteger + implements AsyncEnumerator, BiConsumer { + + final AsyncEnumerable source; + + final S state; + + final BiFunction> completer; + + final AtomicReference> current; + + T result; + + CompletableFuture completable; + + RetryWhenEnumerator(AsyncEnumerable source, S state, BiFunction> completer) { + this.source = source; + this.state = state; + this.completer = completer; + this.current = new AtomicReference<>(source.enumerator()); + } + + @Override + public CompletionStage moveNext() { + CompletableFuture cf = new CompletableFuture<>(); + completable = cf; + nextItem(); + return cf; + } + + @Override + public T current() { + return result; + } + + void nextItem() { + if (getAndIncrement() == 0) { + do { + current.get().moveNext().whenComplete(this); + } while (decrementAndGet() != 0); + } + } + + @Override + public void cancel() { + AsyncEnumeratorHelper.cancel(current); + } + + @Override + public void accept(Boolean aBoolean, Throwable throwable) { + if (throwable != null) { + result = null; + completer.apply(state, throwable).whenComplete(this::acceptCompleter); + return; + } + + if (aBoolean) { + result = current.getPlain().current(); + completable.complete(true); + } else { + result = null; + completable.complete(false); + } + } + + void acceptCompleter(Boolean shouldRetry, Throwable throwable) { + if (throwable != null) { + result = null; + completable.completeExceptionally(throwable); + return; + } + + if (shouldRetry) { + if (AsyncEnumeratorHelper.replace(current, source.enumerator())) { + nextItem(); + } + } else { + completable.complete(false); + } + } + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncDoOnCancelTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncDoOnCancelTest.java new file mode 100644 index 0000000..acc6f5e --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncDoOnCancelTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AsyncDoOnCancelTest { + + @Test + public void simple() { + AtomicBoolean cancelled = new AtomicBoolean(); + + TestHelper.assertResult( + AsyncEnumerable.range(1, 5) + .doOnCancel(() -> cancelled.set(true)) + .take(3), + 1, 2, 3 + ); + + assertTrue(cancelled.get()); + } + + + @Test + public void error() { + AtomicBoolean cancelled = new AtomicBoolean(); + + TestHelper.assertFailure( + AsyncEnumerable.error(new IOException()) + .doOnCancel(() -> cancelled.set(true)) + .take(3), + IOException.class + ); + + assertFalse(cancelled.get()); + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java new file mode 100644 index 0000000..dbbf67f --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncFromCallableTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; + +public class AsyncFromCallableTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.fromCallable(() -> 1), + 1); + } + + @Test + public void error() { + TestHelper.assertFailure( + AsyncEnumerable.fromCallable(() -> { throw new IOException(); }), + IOException.class + ); + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncIgnoreElementsTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncIgnoreElementsTest.java new file mode 100644 index 0000000..d88d97c --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncIgnoreElementsTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class AsyncIgnoreElementsTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.range(1, 5) + .ignoreElements() + ); + } + + @Test + public void cancel() { + AtomicBoolean cancelled = new AtomicBoolean(); + AsyncEnumerable.never() + .doOnCancel(() -> cancelled.set(true)) + .ignoreElements() + .enumerator() + .cancel(); + + assertTrue(cancelled.get()); + } + + @Test + public void error() { + TestHelper.assertFailure( + AsyncEnumerable.error(new IOException()) + .ignoreElements(), + IOException.class + ); + } + + @Test + public void nullCurrent() { + assertNull(AsyncEnumerable.never().ignoreElements().enumerator().current()); + } + +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatCallableTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatCallableTest.java new file mode 100644 index 0000000..6a9f22b --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatCallableTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; + +public class AsyncRepeatCallableTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.repeatCallable(() -> 1).take(5), + 1, 1, 1, 1, 1 + ); + } + + @Test + public void error() { + TestHelper.assertFailure( + AsyncEnumerable.repeatCallable(() -> { throw new IOException(); }), + IOException.class + ); + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatItemTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatItemTest.java new file mode 100644 index 0000000..71ba8e9 --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatItemTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +public class AsyncRepeatItemTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.repeatItem(1).take(5), + 1, 1, 1, 1, 1 + ); + + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatTest.java new file mode 100644 index 0000000..3e6ff5e --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; + +public class AsyncRepeatTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.just(1) + .repeat(5), + 1, 1, 1, 1, 1 + ); + } + + @Test + public void take() { + TestHelper.assertResult( + AsyncEnumerable.just(1) + .repeat(5) + .take(5), + 1, 1, 1, 1, 1 + ); + } + + @Test + public void error() { + TestHelper.assertFailure( + AsyncEnumerable.error(new IOException()) + .repeat(1), + IOException.class + ); + } + + @Test + public void stop() { + TestHelper.assertResult( + AsyncEnumerable.just(1) + .repeat(() -> true), + 1 + ); + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatWhenTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatWhenTest.java new file mode 100644 index 0000000..cf76de0 --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncRepeatWhenTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncRepeatWhenTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.just(1) + .repeatWhen(() -> CompletableFuture.completedFuture(true)) + .take(5), + 1, 1, 1, 1, 1 + ); + } + + + @Test + public void withState() { + TestHelper.assertResult( + AsyncEnumerable.just(1) + .repeatWhen(AtomicInteger::new, s -> CompletableFuture.completedFuture(s.incrementAndGet() < 5)) + , + 1, 1, 1, 1, 1 + ); + } + + @Test + public void mainError() { + TestHelper.assertFailure( + AsyncEnumerable.error(new IOException()) + .repeatWhen(() -> CompletableFuture.completedStage(false)), + IOException.class + ); + } + + @Test + public void completerError() { + TestHelper.assertFailure( + AsyncEnumerable.just(1) + .repeatWhen(() -> CompletableFuture.failedStage(new IOException())), + IOException.class + ); + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncRetryTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncRetryTest.java new file mode 100644 index 0000000..76ef381 --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncRetryTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncRetryTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.just(1) + .retry(5), + 1 + ); + } + + @Test + public void take() { + TestHelper.assertResult( + AsyncEnumerable.repeatItem(1) + .retry(5) + .take(5), + 1, 1, 1, 1, 1 + ); + } + + @Test + public void error() { + TestHelper.assertFailure( + AsyncEnumerable.error(new IOException()) + .retry(1), + IOException.class + ); + } + + @Test + public void stop() { + TestHelper.assertResult( + AsyncEnumerable.just(1) + .retry(e -> false), + 1 + ); + } + + @Test + public void rightError() { + AtomicInteger count = new AtomicInteger(); + + TestHelper.assertResult( + AsyncEnumerable.defer(() -> { + if (count.getAndIncrement() == 0) { + return AsyncEnumerable.error(new IOException()); + } + return AsyncEnumerable.range(1, 5); + }) + .retry(1), + 1, 2, 3, 4, 5 + ); + } + + @Test + public void wrongError() { + AtomicInteger count = new AtomicInteger(); + + TestHelper.assertFailure( + AsyncEnumerable.error(new IOException()) + .retry(e -> e instanceof RuntimeException), + IOException.class + ); + } +} diff --git a/src/test/java/hu/akarnokd/asyncenum/AsyncRetryWhenTest.java b/src/test/java/hu/akarnokd/asyncenum/AsyncRetryWhenTest.java new file mode 100644 index 0000000..4869673 --- /dev/null +++ b/src/test/java/hu/akarnokd/asyncenum/AsyncRetryWhenTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.asyncenum; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncRetryWhenTest { + + @Test + public void simple() { + TestHelper.assertResult( + AsyncEnumerable.range(1, 5) + .retryWhen(e -> CompletableFuture.completedStage(false)) + , + 1, 2, 3, 4, 5 + ); + } + + @Test + public void take() { + TestHelper.assertResult( + AsyncEnumerable.range(1, 5) + .retryWhen(e -> CompletableFuture.completedStage(false)) + .take(3), + 1, 2, 3 + ); + } + + @Test + public void error() { + TestHelper.assertResult( + AsyncEnumerable.error(new IOException()) + .retryWhen(AtomicInteger::new, (s, e) -> CompletableFuture.completedStage(s.incrementAndGet() < 5)) + ); + } + + @Test + public void completerFails() { + TestHelper.assertFailure( + AsyncEnumerable.error(new IOException()) + .retryWhen(AtomicInteger::new, (s, e) -> CompletableFuture.failedStage(e)), + IOException.class + ); + + } +}