Skip to content
This repository has been archived by the owner on Mar 13, 2021. It is now read-only.

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ericbottard committed Oct 2, 2019
1 parent a865bf6 commit 8cab801
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/main/java/io/projectriff/invoker/server/GrpcServerAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

Expand Down Expand Up @@ -151,8 +152,8 @@ private Function<Flux<Tuple2<Integer, Message<byte[]>>>, Flux<Tuple2<Integer, Me
// Used to transform the publisher chain into one that doesn't forward cancel() calls once it has complete()d.
private Function<? super Publisher<Tuple2<Integer, Message<byte[]>>>, ? extends Publisher<Tuple2<Integer, Message<byte[]>>>> ignoreCancelsAfterComplete() {
return Operators.lift((f, actual) ->
new CoreSubscriber<>() {
AtomicBoolean completed = new AtomicBoolean();
new CoreSubscriber<Tuple2<Integer, Message<byte[]>>>() {
private volatile boolean completed;

@Override
public void onSubscribe(Subscription s) {
Expand All @@ -164,7 +165,7 @@ public void request(long n) {

@Override
public void cancel() {
if (!completed.get()) {
if (!completed) {
s.cancel();
}
}
Expand All @@ -183,9 +184,14 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
completed.compareAndSet(false, true);
completed = true;
actual.onComplete();
}

@Override
public Context currentContext() {
return actual.currentContext();
}
});
}

Expand Down

0 comments on commit 8cab801

Please sign in to comment.