Skip to content

Commit

Permalink
Propagate Reactor's context in tracing publisher (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov authored Jan 27, 2022
1 parent 3bb4030 commit 961c38c
Show file tree
Hide file tree
Showing 13 changed files with 755 additions and 124 deletions.
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ zipkin-brave-instrumentation = { module = 'io.zipkin.brave:brave-instrumentation
zipkin-reporter = { module = 'io.zipkin.reporter2:zipkin-reporter', version.ref = 'zipkin-reporter' }

[plugins]
kotlin = { id = 'org.jetbrains.kotlin.jvm', version.ref = 'kotlin' }
kotlinjvm = { id = 'org.jetbrains.kotlin.jvm', version.ref = 'kotlin' }
kotlinkapt = { id = 'org.jetbrains.kotlin.kapt', version.ref = 'kotlin' }
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ include 'tracing-bom'
include 'tracing-core'
include 'tracing-jaeger'
include 'tracing-zipkin'
include 'tests:kotlin-tests'

enableFeaturePreview 'TYPESAFE_PROJECT_ACCESSORS'
enableFeaturePreview 'VERSION_CATALOGS'
Expand Down
30 changes: 30 additions & 0 deletions tests/kotlin-tests/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
alias libs.plugins.kotlinjvm
alias libs.plugins.kotlinkapt
}

dependencies {
api project(':tracing-core')
api mn.micronaut.http.client
api mn.micronaut.runtime

api libs.kotlinx.coroutines.core
api libs.kotlinx.coroutines.reactor

kaptTest mn.micronaut.inject.java

testImplementation libs.kotlinx.coroutines.core
testImplementation libs.kotlinx.coroutines.reactor

testImplementation mn.micronaut.http.server.netty
testImplementation mn.micronaut.test.junit5
testImplementation 'org.junit.jupiter:junit-jupiter-engine'
}

tasks.named("compileTestKotlin") {
kotlinOptions.jvmTarget = "1.8"
}

tasks.named("test") {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package reactor

import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Introspected
import io.micronaut.http.HttpRequest
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.annotation.*
import io.micronaut.http.client.HttpClient
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
import io.micronaut.runtime.server.EmbeddedServer
import jakarta.inject.Singleton
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.reactor.asCoroutineContext
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Test
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.context.Context
import reactor.util.function.Tuple2
import reactor.util.function.Tuples
import java.util.*

class ReactorContextPropagationSpec {

@Test
fun testKotlinPropagation() {
val embeddedServer = ApplicationContext.run(EmbeddedServer::class.java,
mapOf("reactortestpropagation.enabled" to "true", "micronaut.http.client.read-timeout" to "30s")
)
val client = embeddedServer.applicationContext.getBean(HttpClient::class.java)

val result: MutableList<Tuple2<String, String>> = Flux.range(1, 100)
.flatMap {
val tracingId = UUID.randomUUID().toString()
val get = HttpRequest.POST<Any>("http://localhost:${embeddedServer.port}/trigger", NameRequestBody("sss-" + tracingId)).header("X-TrackingId", tracingId)
Mono.from(client.retrieve(get, String::class.java))
.map { Tuples.of(it as String, tracingId) }
}
.collectList()
.block()

for (t in result) {
assert(t.t1 == t.t2)
}

embeddedServer.stop()
}


}

@Requires(property = "reactortestpropagation.enabled")
@Controller
class TestController(private val someService: SomeService) {

@Post("/trigger")
suspend fun trigger(request: HttpRequest<*>, @Body requestBody: SomeBody): String {
return withContext(Dispatchers.IO) {
someService.findValue()
}
}

@Get("/data")
suspend fun getTracingId(request: HttpRequest<*>): String {
val reactorContextView = currentCoroutineContext()[ReactorContext.Key]!!.context
return reactorContextView.get("reactorTrackingId") as String
}

}

@Introspected
class SomeBody(val name: String)

@Requires(property = "reactortestpropagation.enabled")
@Singleton
class SomeService {

suspend fun findValue(): String {
delay(50)
return withContext(Dispatchers.Default) {
delay(50)
val context = currentCoroutineContext()[ReactorContext.Key]!!.context
val reactorTrackingId = context.get("reactorTrackingId") as String
val suspendTrackingId = context.get("suspendTrackingId") as String
if (reactorTrackingId != suspendTrackingId) {
throw IllegalArgumentException()
}
suspendTrackingId
}
}

}

@Requires(property = "reactortestpropagation.enabled")
@Filter(Filter.MATCH_ALL_PATTERN)
class ReactorHttpServerFilter : HttpServerFilter {

override fun doFilter(request: HttpRequest<*>, chain: ServerFilterChain): Publisher<MutableHttpResponse<*>> {
val trackingId = request.headers["X-TrackingId"] as String
return Mono.from(chain.proceed(request)).contextWrite {
it.put("reactorTrackingId", trackingId)
}
}

override fun getOrder(): Int = 1
}

@Requires(property = "reactortestpropagation.enabled")
@Filter(Filter.MATCH_ALL_PATTERN)
class SuspendHttpServerFilter : CoroutineHttpServerFilter {

override suspend fun filter(request: HttpRequest<*>, chain: ServerFilterChain): MutableHttpResponse<*> {
val trackingId = request.headers["X-TrackingId"] as String
//withContext does not merge the current context so data may be lost
return withContext(Context.of("suspendTrackingId", trackingId).asCoroutineContext()) {
chain.next(request)
}
}

override fun getOrder(): Int = 0
}

interface CoroutineHttpServerFilter : HttpServerFilter {

suspend fun filter(request: HttpRequest<*>, chain: ServerFilterChain): MutableHttpResponse<*>

override fun doFilter(request: HttpRequest<*>, chain: ServerFilterChain): Publisher<MutableHttpResponse<*>> {
return mono {
filter(request, chain)
}
}

}

suspend fun ServerFilterChain.next(request: HttpRequest<*>): MutableHttpResponse<*> {
return this.proceed(request).asFlow().single()
}

@Introspected
class NameRequestBody(val name: String)
2 changes: 1 addition & 1 deletion tracing-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id 'io.micronaut.build.internal.tracing-module'
alias libs.plugins.kotlin
alias libs.plugins.kotlinjvm
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.filter.ClientFilterChain;
import io.micronaut.http.filter.HttpClientFilter;
import io.micronaut.tracing.instrument.util.TracingPublisher;
import io.micronaut.tracing.instrument.util.TracingObserver;
import io.micronaut.tracing.instrument.util.TracingPublisherUtils;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
Expand Down Expand Up @@ -82,37 +83,38 @@ public Publisher<? extends HttpResponse<?>> doFilter(MutableHttpRequest<?> reque
SpanContext activeContext = activeSpan == null ? null : activeSpan.context();
SpanBuilder spanBuilder = newSpan(request, activeContext);

return new TracingPublisher(requestPublisher, tracer, spanBuilder, true) {
return TracingPublisherUtils.createTracingPublisher(requestPublisher, tracer, spanBuilder, true, new TracingObserver() {

@Override
protected void doOnSubscribe(@NonNull Span span) {
public void doOnSubscribe(@NonNull Span span) {
span.setTag(TAG_HTTP_CLIENT, true);
SpanContext spanContext = span.context();
tracer.inject(
spanContext,
HTTP_HEADERS,
new HttpHeadersTextMap(request.getHeaders())
spanContext,
HTTP_HEADERS,
new HttpHeadersTextMap(request.getHeaders())
);
request.setAttribute(CURRENT_SPAN_CONTEXT, spanContext);
request.setAttribute(CURRENT_SPAN, span);
}

@Override
protected void doOnNext(@NonNull Object object, @NonNull Span span) {
public void doOnNext(@NonNull Object object, @NonNull Span span) {
if (object instanceof HttpResponse) {
setResponseTags(request, (HttpResponse<?>) object, span);
}
}

@Override
protected void doOnError(@NonNull Throwable error, @NonNull Span span) {
public void doOnError(@NonNull Throwable error, @NonNull Span span) {
if (error instanceof HttpClientResponseException) {
HttpClientResponseException e = (HttpClientResponseException) error;
HttpResponse<?> response = e.getResponse();
setResponseTags(request, response, span);
}
setErrorTags(span, error);
}
};

});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import io.micronaut.http.annotation.Filter;
import io.micronaut.http.filter.HttpServerFilter;
import io.micronaut.http.filter.ServerFilterChain;
import io.micronaut.tracing.instrument.util.TracingPublisher;
import io.micronaut.tracing.instrument.util.TracingObserver;
import io.micronaut.tracing.instrument.util.TracingPublisherUtils;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
Expand Down Expand Up @@ -84,16 +85,16 @@ public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, Server
}

SpanBuilder spanBuilder = continued ? null : newSpan(request, initSpanContext(request));
return new TracingPublisher(chain.proceed(request), tracer, spanBuilder) {
return TracingPublisherUtils.createTracingPublisher(chain.proceed(request), tracer, spanBuilder, new TracingObserver() {

@Override
protected void doOnSubscribe(@NonNull Span span) {
public void doOnSubscribe(@NonNull Span span) {
span.setTag(TAG_HTTP_SERVER, true);
request.setAttribute(CURRENT_SPAN, span);
}

@Override
protected void doOnNext(@NonNull Object object, @NonNull Span span) {
public void doOnNext(@NonNull Object object, @NonNull Span span) {
if (!(object instanceof HttpResponse)) {
return;
}
Expand All @@ -104,21 +105,22 @@ protected void doOnNext(@NonNull Object object, @NonNull Span span) {
}

@Override
protected void doOnError(@NonNull Throwable throwable, @NonNull Span span) {
public void doOnError(@NonNull Throwable throwable, @NonNull Span span) {
request.setAttribute(CONTINUE, true);
setErrorTags(span, throwable);
}

@Override
protected boolean isContinued() {
public boolean isContinued() {
return continued;
}

@Override
protected boolean isFinishOnError() {
public boolean isFinishOnError() {
return false;
}
};

});
}

@Override
Expand Down
Loading

0 comments on commit 961c38c

Please sign in to comment.