From fb5e881961263923d037d61d2faa821df849cd27 Mon Sep 17 00:00:00 2001 From: Rodrigo Pastrana Date: Tue, 28 May 2024 21:09:42 -0400 Subject: [PATCH] HPCC-586 WsClient OTEL tracing - Injects trace context in traceparent http header - Integrates autoconfigure otel SDK - Adds manual WsWUClient.ping span - Adds manual WsWUClientTest.ping span - Adds manual span around getHPCCver and getHPCCContainerizedMode Signed-off-by: Rodrigo Pastrana --- pom.xml | 42 +++++ .../ws/client/BaseHPCCWsClient.java | 167 ++++++++++++++++-- .../ws/client/HPCCWsWorkUnitsClient.java | 106 ++++++----- .../hpccsystems/ws/client/BaseRemoteTest.java | 40 +++-- .../ws/client/WSWorkunitsTest.java | 34 +++- 5 files changed, 313 insertions(+), 76 deletions(-) diff --git a/pom.xml b/pom.xml index 4762a4faf..643ee26ff 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ org.hpccsystems.commons.annotations.BaseTests 1.0.0 false + 2.4.0-alpha @@ -99,7 +100,48 @@ + + + + io.opentelemetry + opentelemetry-bom + 1.38.0 + pom + import + + + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-logging + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure-spi + + + io.opentelemetry + opentelemetry-exporter-otlp + + + + io.opentelemetry.semconv + opentelemetry-semconv + 1.25.0-alpha + junit junit diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java index 2a856a5a2..b09ad00aa 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java @@ -4,6 +4,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.ParserConfigurationException; @@ -40,6 +42,21 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapSetter; +//import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.HttpAttributes; +import io.opentelemetry.semconv.ServerAttributes; + /** * Defines functionality common to all HPCC Systmes web service clients. * @@ -47,6 +64,8 @@ */ public abstract class BaseHPCCWsClient extends DataSingleton { + public static final String INSTRUMENTED_LIBRARY_NAME = "WsClient"; + private static OpenTelemetry globalOTel = null; /** Constant log */ protected static final Logger log = LogManager.getLogger(BaseHPCCWsClient.class); /** Constant DEAFULTECLWATCHPORT="8010" */ @@ -164,6 +183,76 @@ private String getTargetHPCCBuildVersionString() throws Exception } + public SpanBuilder getWsClientSpanBuilder(String spanName) + { + SpanBuilder spanBuilder = getWsClientTracer().spanBuilder(spanName) + .setAttribute(ServerAttributes.SERVER_ADDRESS, wsconn.getHost()) + .setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(wsconn.getPort())) + .setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, HttpAttributes.HttpRequestMethodValues.GET) + .setSpanKind(SpanKind.CLIENT); + + return spanBuilder; + } + + static public void injectCurrentSpanTraceParentHeader(Stub clientStub) + { + if (clientStub != null) + { + injectCurrentSpanTraceParentHeader(clientStub._getServiceClient().getOptions()); + } + } + + static public void injectCurrentSpanTraceParentHeader(Options options) + { + if (options != null) + { + W3CTraceContextPropagator.getInstance().inject(Context.current(), options, Options::setProperty); + } + } + + static public String getCurrentSpanTraceParentHeader() + { + String traceparent = null; + Span currentSpan = Span.current(); + if (currentSpan != null && currentSpan.getSpanContext().isValid()) + { + Map carrier = new HashMap<>(); + TextMapSetter> setter = Map::put; + W3CTraceContextPropagator.getInstance().inject(Context.current(), carrier, setter); + + traceparent = carrier.getOrDefault("traceparent", "00-" + currentSpan.getSpanContext().getTraceId() + "-" + currentSpan.getSpanContext().getSpanId() + "-00"); + carrier.clear(); + } + + return traceparent; + } + + private void initOTel() + { + /* + * If using the OpenTelemetry SDK, you may want to instantiate the OpenTelemetry toprovide configuration, for example of Resource or Sampler. See OpenTelemetrySdk and OpenTelemetrySdk.builder for information on how to construct theSDK's OpenTelemetry implementation. + * WARNING: Due to the inherent complications around initialization order involving this classand its single global instance, we strongly recommend *not* using GlobalOpenTelemetry unless youhave a use-case that absolutely requires it. Please favor using instances of OpenTelemetrywherever possible. + * If you are using the OpenTelemetry javaagent, it is generally best to only callGlobalOpenTelemetry.get() once, and then pass the resulting reference where you need to use it. + */ + + //autoconfigured telemetry should be configured via env vars or system attributes: + //https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md + //For Example: + // -Dotel.traces.exporter=logging + // -Dotel.metrics.exporter=none + // -D.otel.logs.exporter=none + // -Dotel.java.global-autoconfigure.enabled=true + + globalOTel = GlobalOpenTelemetry.get(); + } + + public Tracer getWsClientTracer() + { + if (globalOTel == null) + initOTel(); + + return globalOTel.getTracer(INSTRUMENTED_LIBRARY_NAME); + } /** * All instances of HPCCWsXYZClient should utilize this init function * Attempts to establish the target HPCC build version and its container mode @@ -175,36 +264,55 @@ private String getTargetHPCCBuildVersionString() throws Exception */ protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAndContainerMode) { + initOTel(); + boolean success = true; initErrMessage = ""; setActiveConnectionInfo(connection); if (fetchVersionAndContainerMode) { - try + Span fetchHPCCVerSpan = getWsClientSpanBuilder("FetchHPCCVersion").startSpan(); + try (Scope scope = fetchHPCCVerSpan.makeCurrent()) { - targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString()); + try + { + targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString()); + } + catch (Exception e) + { + initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values"; + if (!e.getLocalizedMessage().isEmpty()) + initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); + success = false; + } } - catch (Exception e) + finally { - initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values"; - if (!e.getLocalizedMessage().isEmpty()) - initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); - - success = false; + fetchHPCCVerSpan.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage); + fetchHPCCVerSpan.end(); } - try + Span fetchHPCCContainerMode = getWsClientSpanBuilder("FetchHPCCContainerMode").startSpan(); + try (Scope scope = fetchHPCCContainerMode.makeCurrent()) { - targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn); + try + { + targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn); + } + catch (Exception e) + { + initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values"; + if (!e.getLocalizedMessage().isEmpty()) + initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); + + success = false; + } } - catch (Exception e) + finally { - initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values"; - if (!e.getLocalizedMessage().isEmpty()) - initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); - - success = false; + fetchHPCCContainerMode.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage); + fetchHPCCContainerMode.end(); } } if (!initErrMessage.isEmpty()) @@ -401,7 +509,10 @@ public String getInitError() protected Stub verifyStub() throws Exception { if (stub != null) + { + injectCurrentSpanTraceParentHeader(stub); return stub; + } else throw new Exception("WS Client Stub not available." + (hasInitError() ? "\n" + initErrMessage : "")); } @@ -582,6 +693,10 @@ static public Stub setStubOptions(Stub thestub, Connection connection) throws Ax opt.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE); + //only do this if tracing enabled? + //injectCurrentSpanTraceParentHeader(opt); + //opt.setProperty("traceparent", getTraceParentHeader()); + if (connection.getPreemptiveHTTPAuthenticate()) { //Axis2 now forces connection authenticate, even if target is not secure @@ -676,12 +791,32 @@ protected void handleEspSoapFaults(EspSoapFaultWrapper e, String message) throws * the array of esp exception wrapper */ protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String message) throws ArrayOfEspExceptionWrapper + { + handleEspExceptions(exp, message, null); + } + + /** + * Handle esp exceptions. + * + * @param exp + * the exp + * @param message + * the message + * @param span + * optional span. Resulting exception reported on span + * @throws org.hpccsystems.ws.client.wrappers.ArrayOfEspExceptionWrapper + * the array of esp exception wrapper + */ + protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String message, Span span) throws ArrayOfEspExceptionWrapper { if (exp == null || exp.getExceptions() == null || exp.getExceptions().size() <= 0) return; if (message != null && !message.isEmpty()) exp.setWsClientMessage(message); log.error(exp.toString()); + if (span != null && span.getSpanContext().isValid()) + span.recordException(exp); + throw exp; } diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java index 829644a84..6fd698d5f 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java @@ -100,6 +100,10 @@ import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; import org.hpccsystems.ws.client.wrappers.wsworkunits.WsWorkunitsClientStubWrapper; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; + /** * Facilitates ECL WorkUnit related actions. * @@ -293,47 +297,60 @@ public void initWsWorkUnitsClientStub(Connection conn) */ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExceptionWrapper { - verifyStub(); // Throws exception if stub failed - - WUQuery request = new WUQuery(); - - WUState previousState = getStateID(wu); - - request.setWuid(wu.getWuid()); - request.setCount(1); + Span span = getWsClientSpanBuilder("FastWURefresh").startSpan(); + try (Scope scope = span.makeCurrent()) + { + verifyStub(); // Throws exception if stub failed - WUQueryResponse response = null; + WUQuery request = new WUQuery(); - try - { - response = ((WsWorkunits) stub).wUQuery(request); - } - catch (RemoteException e) - { - throw new Exception("WsWorkunits.fastWURefresh(...) encountered RemoteException.", e); - } - catch (EspSoapFault e) - { - handleEspSoapFaults(new EspSoapFaultWrapper(e), "Could Not perform fastWURefresh"); - } + WUState previousState = getStateID(wu); - if (response != null) - { - if (response.getExceptions() != null) - handleEspExceptions(new ArrayOfEspExceptionWrapper(response.getExceptions()), "Could Not perform fastWURefresh"); + request.setWuid(wu.getWuid()); + request.setCount(1); - if (response.getWorkunits() != null) + WUQueryResponse response = null; + try { - ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit(); - - if (eclWorkunit != null && eclWorkunit.length == 1) wu.update(eclWorkunit[0]); + response = ((WsWorkunits) stub).wUQuery(request); + } + catch (RemoteException e) + { + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getLocalizedMessage()); + throw new Exception("WsWorkunits.fastWURefresh(...) encountered RemoteException.", e); + } + catch (EspSoapFault e) + { + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getLocalizedMessage()); + handleEspSoapFaults(new EspSoapFaultWrapper(e), "Could Not perform fastWURefresh"); } - if (previousState != getStateID(wu)) + if (response != null) { - fullWURefresh(wu); + if (response.getExceptions() != null) + handleEspExceptions(new ArrayOfEspExceptionWrapper(response.getExceptions()), "Could Not perform fastWURefresh", span); + + span.setStatus(StatusCode.OK); + if (response.getWorkunits() != null) + { + ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit(); + + if (eclWorkunit != null && eclWorkunit.length == 1) + wu.update(eclWorkunit[0]); + } + + if (previousState != getStateID(wu)) + { + fullWURefresh(wu); + } } } + finally + { + span.end(); + } } /** @@ -2551,18 +2568,27 @@ public List deleteQueries(Set querynames, String clu */ public boolean ping() throws Exception { - verifyStub(); - - Ping request = new Ping(); - - try + Span span = getWsClientSpanBuilder("WsWUClient_Ping").startSpan(); + try (Scope scope = span.makeCurrent()) { - ((WsWorkunitsStub) stub).ping(request); + verifyStub(); // must be called within span scope for proper context propagation + + Ping request = new Ping(); + try + { + ((WsWorkunitsStub) stub).ping(request); + span.setStatus(StatusCode.OK); + } + catch (Exception e) + { + span.recordException(e); + log.error(e.getLocalizedMessage()); + return false; + } } - catch (Exception e) + finally { - log.error(e.getLocalizedMessage()); - return false; + span.end(); } return true; diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java index 14b0f23ce..b1377a31e 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java @@ -30,28 +30,26 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.hpccsystems.ws.client.HPCCWsWorkUnitsClient; -import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; -import org.hpccsystems.ws.client.HPCCWsClient; import org.hpccsystems.ws.client.HPCCWsTopologyClient.TopologyGroupQueryKind; import org.hpccsystems.ws.client.platform.Platform; import org.hpccsystems.ws.client.utils.Connection; import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupWrapper; -import org.junit.Assume; +import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; import org.junit.Assert; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -import java.net.URL; - -import java.nio.file.Paths; -import java.nio.file.Path; -import java.nio.file.Files; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; @Category(org.hpccsystems.commons.annotations.RemoteTests.class) public abstract class BaseRemoteTest { public static Exception initializationException = null; + private final static String INSTRUMENTED_LIB_NAME = "WsClientJUnitSuite"; protected static Platform platform; protected static HPCCWsClient wsclient; @@ -72,6 +70,7 @@ public abstract class BaseRemoteTest protected final static int testThreadCount = Integer.parseInt(System.getProperty("testthreadcount", "10")); public static final String DEFAULTHPCCFILENAME = "benchmark::all_types::200kb"; + protected static OpenTelemetry globalOTel = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); /* * Code to generate superfile with default file as subfile @@ -109,6 +108,16 @@ public static void initCheck() Assume.assumeTrue("Error initializing test suite: " + exceptionMessage, initializationException == null); } + public static Tracer getRemoteTestTracer() + { + return globalOTel.getTracer(INSTRUMENTED_LIB_NAME); + } + + public static SpanBuilder getRemoteTestTraceBuilder(String spanName) + { + return getRemoteTestTracer().spanBuilder(spanName); + } + public static void initialize() throws Exception { // This allows testing against locally created self signed certs to work. @@ -226,7 +235,7 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) } else { - System.out.println("RemoteTest: 'roxiegroupname': '" + roxieclustername + "'"); + System.out.println("RemoteTest: 'roxiegroupname': '" + roxieclustername + "'"); } } catch (Exception e) @@ -240,7 +249,16 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) // Run the generate-datasets.ecl script if present in the project resources try { - executeECLScript("generate-datasets.ecl"); + //Span eclscriptspan = tracer.spanBuilder("generate-datasets.ecl").startSpan(); + + //try (Scope innerScope = eclscriptspan.makeCurrent()) + { + executeECLScript("generate-datasets.ecl"); + } + //finally + //{ + // eclscriptspan.end(); + //} } catch (Exception e) { diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java index deb12aca5..12f446db7 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java @@ -38,6 +38,10 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. import org.junit.Test; import org.junit.runners.MethodSorters; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; + @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class WSWorkunitsTest extends BaseRemoteTest { @@ -82,18 +86,30 @@ public void testSharedWsWUgets() throws InterruptedException @Test public void stageA_ping() throws Exception { - try - { - Assert.assertTrue(client.ping()); - } - catch (AxisFault e) + Span pingSpan = getRemoteTestTraceBuilder("WsWUTests-PingTest").startSpan(); + + try (Scope innerScope = pingSpan.makeCurrent()) { - e.printStackTrace(); - Assert.fail(); + try + { + Assert.assertTrue(client.ping()); + pingSpan.setStatus(StatusCode.OK); + } + catch (AxisFault e) + { + pingSpan.recordException(e); + e.printStackTrace(); + Assert.fail(); + } + catch (Exception e) + { + pingSpan.recordException(e); + Assert.fail(e.toString()); + } } - catch (Exception e) + finally { - Assert.fail(e.toString()); + pingSpan.end(); } }