Skip to content

Commit

Permalink
[event-hubs] prevent empty span creation when tracing is disabled (Az…
Browse files Browse the repository at this point in the history
…ure#17129)

Fixes Azure#14063

This update improves the event instrumentation logic so that spans are only created if tracing is enabled. This fixes the issue where adding events to an EventDataBatch led to empty spans being created when tracing was not enabled.

The `instrumentEventData` function is similar to the `instrumentMessage` function in service bus. The main difference is that in event hubs we need to check if the event being instrumented is an AmqpAnnotatedMessage or an EventData since properties are stored in a different field (properties vs applicationProperties) depending which is used.
In service bus, the ServiceBusMessage uses applicationProperties, so no distinction versus AmqpAnnotatedMessage is needed there.

/cc @maorleger
  • Loading branch information
chradek authored Aug 25, 2021
1 parent ccf4bc5 commit d938e7b
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 47 deletions.
55 changes: 38 additions & 17 deletions sdk/eventhub/event-hubs/src/diagnostics/instrumentEventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import {
getTraceParentHeader,
isSpanContextValid
} from "@azure/core-tracing";
import { Span, SpanContext } from "@azure/core-tracing";
import { SpanContext } from "@azure/core-tracing";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { EventData, isAmqpAnnotatedMessage } from "../eventData";
import { OperationOptions } from "../util/operationOptions";
import { createMessageSpan } from "./tracing";

/**
* @hidden
Expand All @@ -24,31 +26,50 @@ export const TRACEPARENT_PROPERTY = "Diagnostic-Id";
*/
export function instrumentEventData(
eventData: EventData | AmqpAnnotatedMessage,
span: Span
): EventData {
options: OperationOptions,
entityPath: string,
host: string
): { event: EventData; spanContext: SpanContext | undefined } {
const props = isAmqpAnnotatedMessage(eventData)
? eventData.applicationProperties
: eventData.properties;

if (props && props[TRACEPARENT_PROPERTY]) {
return eventData;
// check if the event has already been instrumented
const previouslyInstrumented = Boolean(props?.[TRACEPARENT_PROPERTY]);

if (previouslyInstrumented) {
return { event: eventData, spanContext: undefined };
}

const copiedProps = { ...props };
const { span: messageSpan } = createMessageSpan(options, { entityPath, host });
try {
if (!messageSpan.isRecording()) {
return {
event: eventData,
spanContext: undefined
};
}

// create a copy so the original isn't modified
if (isAmqpAnnotatedMessage(eventData)) {
eventData = { ...eventData, applicationProperties: copiedProps };
} else {
eventData = { ...eventData, properties: copiedProps };
}
const traceParent = getTraceParentHeader(messageSpan.spanContext());
if (traceParent && isSpanContextValid(messageSpan.spanContext())) {
const copiedProps = { ...props };

const traceParent = getTraceParentHeader(span.spanContext());
if (traceParent && isSpanContextValid(span.spanContext())) {
copiedProps[TRACEPARENT_PROPERTY] = traceParent;
}
// create a copy so the original isn't modified
if (isAmqpAnnotatedMessage(eventData)) {
eventData = { ...eventData, applicationProperties: copiedProps };
} else {
eventData = { ...eventData, properties: copiedProps };
}
copiedProps[TRACEPARENT_PROPERTY] = traceParent;
}

return eventData;
return {
event: eventData,
spanContext: messageSpan.spanContext()
};
} finally {
messageSpan.end();
}
}

/**
Expand Down
26 changes: 10 additions & 16 deletions sdk/eventhub/event-hubs/src/eventDataBatch.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { EventData, isAmqpAnnotatedMessage, toRheaMessage } from "./eventData";
import { EventData, toRheaMessage } from "./eventData";
import { ConnectionContext } from "./connectionContext";
import { MessageAnnotations, message, Message as RheaMessage } from "rhea-promise";
import { throwTypeErrorIfParameterMissing } from "./util/error";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { Span, SpanContext } from "@azure/core-tracing";
import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData";
import { convertTryAddOptionsForCompatibility, createMessageSpan } from "./diagnostics/tracing";
import { instrumentEventData } from "./diagnostics/instrumentEventData";
import { convertTryAddOptionsForCompatibility } from "./diagnostics/tracing";
import { isDefined, isObjectWithProperties } from "./util/typeGuards";
import { OperationTracingOptions } from "@azure/core-tracing";

Expand Down Expand Up @@ -288,22 +288,16 @@ export class EventDataBatchImpl implements EventDataBatch {
throwTypeErrorIfParameterMissing(this._context.connectionId, "tryAdd", "eventData", eventData);
options = convertTryAddOptionsForCompatibility(options);

// check if the event has already been instrumented
const previouslyInstrumented = Boolean(
(isAmqpAnnotatedMessage(eventData)
? eventData.applicationProperties
: eventData.properties)?.[TRACEPARENT_PROPERTY] // Event Data maps properties to applicationProperties.
const { entityPath, host } = this._context.config;
const { event: instrumentedEvent, spanContext } = instrumentEventData(
eventData,
options,
entityPath,
host
);
let spanContext: SpanContext | undefined;
if (!previouslyInstrumented) {
const { span: messageSpan } = createMessageSpan(options, this._context.config);
eventData = instrumentEventData(eventData, messageSpan);
spanContext = messageSpan.spanContext();
messageSpan.end();
}

// Convert EventData to RheaMessage.
const amqpMessage = toRheaMessage(eventData, this._partitionKey);
const amqpMessage = toRheaMessage(instrumentedEvent, this._partitionKey);
const encodedMessage = message.encode(amqpMessage);

let currentSize = this._sizeInBytes;
Expand Down
18 changes: 7 additions & 11 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { NamedKeyCredential, SASCredential, TokenCredential } from "@azure/core-auth";
import { SpanStatusCode, Link, Span, SpanContext, SpanKind } from "@azure/core-tracing";
import { ConnectionContext, createConnectionContext } from "./connectionContext";
import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData";
import { createMessageSpan } from "./diagnostics/tracing";
import { instrumentEventData } from "./diagnostics/instrumentEventData";
import { EventData } from "./eventData";
import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch";
import { EventHubSender } from "./eventHubSender";
Expand Down Expand Up @@ -315,15 +314,12 @@ export class EventHubProducerClient {
partitionKey = expectedOptions.partitionKey;

for (let i = 0; i < batch.length; i++) {
const event = batch[i];
if (!event.properties || !event.properties[TRACEPARENT_PROPERTY]) {
const { span: messageSpan } = createMessageSpan(options, this._context.config);
// since these message spans are created from same context as the send span,
// these message spans don't need to be linked.
// replace the original event with the instrumented one
batch[i] = instrumentEventData(batch[i], messageSpan);
messageSpan.end();
}
batch[i] = instrumentEventData(
batch[i],
options,
this._context.config.entityPath,
this._context.config.host
).event;
}
}
if (isDefined(partitionId) && isDefined(partitionKey)) {
Expand Down
24 changes: 21 additions & 3 deletions sdk/eventhub/event-hubs/test/internal/partitionPump.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe("PartitionPump", () => {
this.spanName = nameArg;
this.spanOptions = optionsArg;
this.context = contextArg;
return super.startSpan(nameArg, optionsArg);
return super.startSpan(nameArg, optionsArg, this.context);
}
}

Expand Down Expand Up @@ -87,9 +87,27 @@ describe("PartitionPump", () => {
const thirdEvent = tracer.startSpan("c");

const receivedEvents: ReceivedEventData[] = [
instrumentEventData({ ...requiredEventProperties }, firstEvent) as ReceivedEventData,
instrumentEventData(
{ ...requiredEventProperties },
{
tracingOptions: {
tracingContext: setSpanContext(context.active(), firstEvent.spanContext())
}
},
"entityPath",
"host"
).event as ReceivedEventData,
{ properties: {}, ...requiredEventProperties }, // no diagnostic ID means it gets skipped
instrumentEventData({ ...requiredEventProperties }, thirdEvent) as ReceivedEventData
instrumentEventData(
{ ...requiredEventProperties },
{
tracingOptions: {
tracingContext: setSpanContext(context.active(), thirdEvent.spanContext())
}
},
"entityPath",
"host"
).event as ReceivedEventData
];

await createProcessingSpan(receivedEvents, eventHubProperties, {});
Expand Down
17 changes: 17 additions & 0 deletions sdk/eventhub/event-hubs/test/internal/sender.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,23 @@ describe("EventHub Sender", function(): void {
resetTracer();
});

it("doesn't create empty spans when tracing is disabled", async () => {
const events: EventData[] = [{ body: "foo" }, { body: "bar" }];

const eventDataBatch = await producerClient.createBatch();

for (const event of events) {
eventDataBatch.tryAdd(event);
}

should.equal(eventDataBatch.count, 2, "Unexpected number of events in batch.");
should.equal(
eventDataBatch["_messageSpanContexts"].length,
0,
"Unexpected number of span contexts in batch."
);
});

function legacyOptionsUsingSpanContext(rootSpan: TestSpan): Pick<TryAddOptions, "parentSpan"> {
return {
parentSpan: rootSpan.spanContext()
Expand Down

0 comments on commit d938e7b

Please sign in to comment.