From c54e3669e7c0ab15e473868a095f3063ace8136e Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 14 Aug 2024 09:03:31 +0400 Subject: [PATCH] No tracing on non-json events (#385) --- packages/opentelemetry/src/instrumentation.ts | 5 +- .../src/opentelemetry/instrumentation.test.ts | 80 ++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/packages/opentelemetry/src/instrumentation.ts b/packages/opentelemetry/src/instrumentation.ts index 90cff18d..b1eb8f44 100644 --- a/packages/opentelemetry/src/instrumentation.ts +++ b/packages/opentelemetry/src/instrumentation.ts @@ -227,6 +227,9 @@ export class Instrumentation extends InstrumentationBase { const resolvedEvent = resolved as ResolvedEvent; const metadata = resolvedEvent?.event?.metadata; + + if (!resolvedEvent.event?.isJson) return; + const parentContext = Instrumentation.restoreContext(metadata!); const { hostname, port } = Instrumentation.getServerAddress(uri); @@ -250,7 +253,7 @@ export class Instrumentation extends InstrumentationBase { return context.with(parentContext, () => { const span = tracer.startSpan(spanName, { attributes, - kind: SpanKind.CLIENT, + kind: SpanKind.CONSUMER, }); try { diff --git a/packages/test/src/opentelemetry/instrumentation.test.ts b/packages/test/src/opentelemetry/instrumentation.test.ts index 39e2688c..bb53dc26 100644 --- a/packages/test/src/opentelemetry/instrumentation.test.ts +++ b/packages/test/src/opentelemetry/instrumentation.test.ts @@ -1,4 +1,10 @@ -import { createTestNode, Defer, delay, jsonTestEvents } from "@test-utils"; +import { + createTestNode, + Defer, + delay, + jsonTestEvents, + binaryTestEvents, +} from "@test-utils"; import { NodeTracerProvider, InMemorySpanExporter, @@ -22,6 +28,7 @@ instrumentation.disable(); import * as esdb from "@eventstore/db-client"; import { AppendToStreamOptions, + binaryEvent, ResolvedEvent, streamNameFilter, WrongExpectedVersionError, @@ -256,6 +263,77 @@ describe("instrumentation", () => { [EventStoreDBAttributes.DATABASE_OPERATION]: "appendToStream", }); }); + + test("non json events are not instrumented in subscription", async () => { + const defer = new Defer(); + const { EventStoreDBClient, jsonEvent, binaryEvent } = await import( + "@eventstore/db-client" + ); + + const STREAM = v4(); + + const client = new EventStoreDBClient( + { endpoint: node.uri }, + { rootCertificate: node.certs.root }, + { username: "admin", password: "changeit" } + ); + + const handleError = jest.fn((error) => { + defer.reject(error); + }); + const handleEvent = jest.fn((event: ResolvedEvent) => { + if (event.event?.streamId == STREAM) { + subscription.unsubscribe(); + } + }); + const handleEnd = jest.fn(defer.resolve); + const handleConfirmation = jest.fn(); + + const event1 = binaryEvent({ + type: "SomeType", + data: Buffer.from("hello"), + }); + const event2 = jsonEvent({ + type: "SomeType", + data: {}, + }); + + await client.appendToStream(STREAM, [event1, event2]); + + const subscription = client + .subscribeToStream(STREAM, { + credentials: { + username: "admin", + password: "changeit", + }, + }) + .on("error", handleError) + .on("data", handleEvent) + .on("end", handleEnd) + .on("confirmation", handleConfirmation); + + await delay(500); + await defer.promise; + + const spans = memoryExporter.getFinishedSpans(); + + const childSpans = spans.filter( + (span) => span.name === EventStoreDBAttributes.STREAM_SUBSCIBE + ); + + expect(handleConfirmation).toHaveBeenCalledTimes(1); + + expect(childSpans).toBeDefined(); + + expect(childSpans).toHaveLength(1); + + expect( + childSpans[0].attributes[EventStoreDBAttributes.EVENT_STORE_EVENT_ID] + ).toBe(event2.id); + expect( + childSpans[0].attributes[EventStoreDBAttributes.EVENT_STORE_EVENT_TYPE] + ).toBe(event2.type); + }); }); describe("persistent subscriptions", () => {