-
Notifications
You must be signed in to change notification settings - Fork 120
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixes Reactor context propagation, including tests
- Loading branch information
1 parent
13df614
commit a11f716
Showing
4 changed files
with
234 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
...or-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorGrpcClientCallFlux.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Copyright (c) 2019, Salesforce.com, Inc. | ||
* All rights reserved. | ||
* Licensed under the BSD 3-Clause license. | ||
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause | ||
*/ | ||
package com.salesforce.reactorgrpc.stub; | ||
|
||
import io.grpc.stub.CallStreamObserver; | ||
import io.grpc.stub.StreamObserver; | ||
import org.reactivestreams.Subscription; | ||
import reactor.core.CoreSubscriber; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.FluxOperator; | ||
import reactor.util.context.Context; | ||
|
||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* Create a {@link Flux} that allows for correct context propagation in client calls | ||
* | ||
* @param <TRequest> | ||
* @param <TResponse> | ||
*/ | ||
public class ReactorGrpcClientCallFlux<TRequest, TResponse> extends FluxOperator<TRequest, TResponse> { | ||
|
||
private final ReactorSubscriberAndClientProducer<TRequest> requestConsumer; | ||
private final ReactorClientStreamObserverAndPublisher<TResponse> responsePublisher; | ||
private final Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate; | ||
|
||
ReactorGrpcClientCallFlux(Flux<TRequest> in, Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) { | ||
super(in); | ||
this.delegate = delegate; | ||
this.requestConsumer = new ReactorSubscriberAndClientProducer<>(); | ||
this.responsePublisher = new ReactorClientStreamObserverAndPublisher<>(s -> requestConsumer.subscribe((CallStreamObserver<TRequest>) s), requestConsumer::cancel); | ||
} | ||
|
||
public ReactorGrpcClientCallFlux(Flux<TRequest> in, Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate, int prefetch, int lowTide) { | ||
super(in); | ||
this.delegate = delegate; | ||
this.requestConsumer = new ReactorSubscriberAndClientProducer<>(); | ||
this.responsePublisher = new ReactorClientStreamObserverAndPublisher<>(s -> requestConsumer.subscribe((CallStreamObserver<TRequest>) s), requestConsumer::cancel, prefetch, lowTide); | ||
} | ||
|
||
public Consumer<? super Subscription> onSubscribeHook() { | ||
return s -> this.delegate.apply(responsePublisher); | ||
} | ||
|
||
@Override | ||
public void subscribe(CoreSubscriber<? super TResponse> actual) { | ||
responsePublisher.subscribe(actual); | ||
source.subscribe(new CoreSubscriber<TRequest>() { | ||
@Override | ||
public void onSubscribe(Subscription s) { | ||
requestConsumer.onSubscribe(s); | ||
} | ||
|
||
@Override | ||
public void onNext(TRequest tRequest) { | ||
requestConsumer.onNext(tRequest); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable throwable) { | ||
requestConsumer.onError(throwable); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
requestConsumer.onComplete(); | ||
} | ||
|
||
@Override | ||
public Context currentContext() { | ||
return actual.currentContext(); | ||
} | ||
}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
138 changes: 138 additions & 0 deletions
138
...tor-grpc-test/src/test/java/com/salesforce/reactorgrpc/ReactorContextPropagationTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* Copyright (c) 2019, Salesforce.com, Inc. | ||
* All rights reserved. | ||
* Licensed under the BSD 3-Clause license. | ||
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause | ||
*/ | ||
|
||
package com.salesforce.reactorgrpc; | ||
|
||
import io.grpc.testing.GrpcServerRule; | ||
import org.junit.*; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Hooks; | ||
import reactor.core.publisher.Mono; | ||
import reactor.test.StepVerifier; | ||
|
||
import java.util.stream.Collectors; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public class ReactorContextPropagationTest { | ||
|
||
@Rule | ||
public GrpcServerRule serverRule = new GrpcServerRule(); | ||
|
||
private static class SimpleGreeter extends ReactorGreeterGrpc.GreeterImplBase { | ||
@Override | ||
public Mono<HelloResponse> sayHello(Mono<HelloRequest> request) { | ||
return request.map(HelloRequest::getName) | ||
.map(name -> HelloResponse.newBuilder().setMessage("Hello " + name).build()); | ||
} | ||
|
||
@Override | ||
public Mono<HelloResponse> sayHelloReqStream(Flux<HelloRequest> request) { | ||
return request.transformDeferredContextual((f, ctx) -> f.map(HelloRequest::getName)) | ||
.collect(Collectors.joining("and")) | ||
.map(names -> HelloResponse.newBuilder().setMessage("Hello " + names).build()); | ||
} | ||
|
||
@Override | ||
public Flux<HelloResponse> sayHelloRespStream(Mono<HelloRequest> request) { | ||
return request.repeat(2) | ||
.map(HelloRequest::getName) | ||
.zipWith(Flux.just("Hello ", "Hi ", "Greetings "), String::join) | ||
.map(greeting -> HelloResponse.newBuilder().setMessage(greeting).build()); | ||
} | ||
|
||
@Override | ||
public Flux<HelloResponse> sayHelloBothStream(Flux<HelloRequest> request) { | ||
return request.map(HelloRequest::getName) | ||
.map(name -> HelloResponse.newBuilder().setMessage("Hello " + name).build()); | ||
} | ||
} | ||
|
||
@BeforeClass | ||
public static void beforeAll(){ | ||
Hooks.enableContextLossTracking(); | ||
Hooks.onOperatorDebug(); | ||
} | ||
|
||
@AfterClass | ||
public static void afterAll(){ | ||
Hooks.disableContextLossTracking(); | ||
Hooks.resetOnOperatorDebug(); | ||
} | ||
|
||
@Before | ||
public void setup() { | ||
serverRule.getServiceRegistry().addService(new SimpleGreeter()); | ||
} | ||
|
||
@Test | ||
public void oneToOne() { | ||
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); | ||
Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactor").build()); | ||
|
||
Mono<HelloResponse> resp = req | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.transform(stub::sayHello) | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.contextWrite(ctx -> ctx.put("name", "context")); | ||
|
||
StepVerifier.create(resp) | ||
.expectNextCount(1) | ||
.verifyComplete(); | ||
} | ||
|
||
@Test | ||
public void oneToMany() { | ||
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); | ||
Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactor").build()); | ||
|
||
Flux<HelloResponse> resp = req | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.as(stub::sayHelloRespStream) | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.contextWrite(ctx -> ctx.put("name", "context")); | ||
|
||
StepVerifier.create(resp) | ||
.expectNextCount(3) | ||
.verifyComplete(); | ||
} | ||
|
||
@Test | ||
public void manyToOne() { | ||
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); | ||
Flux<HelloRequest> req = Mono.deferContextual(ctx -> Mono.just(HelloRequest.newBuilder().setName(ctx.get("name")).build())).repeat(2); | ||
|
||
Mono<HelloResponse> resp = req | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.as(stub::sayHelloReqStream) | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.contextWrite(ctx -> ctx.put("name", "context")); | ||
|
||
StepVerifier.create(resp) | ||
.expectAccessibleContext() | ||
.contains("name", "context") | ||
.then() | ||
.expectNextCount(1) | ||
.verifyComplete(); | ||
} | ||
|
||
@Test | ||
public void manyToMany() { | ||
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); | ||
Flux<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactor").build()).repeat(2).contextWrite(c -> c.put("name", "boom")); | ||
|
||
Flux<HelloResponse> resp = req | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.transform(stub::sayHelloBothStream) | ||
.doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) | ||
.contextWrite(ctx -> ctx.put("name", "context")); | ||
|
||
StepVerifier.create(resp) | ||
.expectNextCount(3) | ||
.verifyComplete(); | ||
} | ||
} |