Skip to content
This repository has been archived by the owner on Mar 13, 2021. It is now read-only.

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ericbottard committed Oct 1, 2019
1 parent 439f767 commit 2ccbbba
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/main/java/io/projectriff/invoker/server/GrpcServerAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

Expand Down Expand Up @@ -149,8 +150,8 @@ private Function<Flux<Tuple2<Integer, Message<byte[]>>>, Flux<Tuple2<Integer, Me
// Used to transform the publisher chain into one that doesn't forward cancel() calls once it has complete()d.
private Function<? super Publisher<Tuple2<Integer, Message<byte[]>>>, ? extends Publisher<Tuple2<Integer, Message<byte[]>>>> ignoreCancelsAfterComplete() {
return Operators.lift((f, actual) ->
new CoreSubscriber<>() {
AtomicBoolean completed = new AtomicBoolean();
new CoreSubscriber<Tuple2<Integer, Message<byte[]>>>() {
private volatile boolean completed;

@Override
public void onSubscribe(Subscription s) {
Expand All @@ -162,7 +163,7 @@ public void request(long n) {

@Override
public void cancel() {
if (!completed.get()) {
if (!completed) {
s.cancel();
}
}
Expand All @@ -181,9 +182,14 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
completed.compareAndSet(false, true);
completed = true;
actual.onComplete();
}

@Override
public Context currentContext() {
return actual.currentContext();
}
});
}

Expand Down

0 comments on commit 2ccbbba

Please sign in to comment.