Skip to content

Commit

Permalink
Support service correlation for mirroring spans (#431)
Browse files Browse the repository at this point in the history
* Support service correlation for mirroring spans

* Added test

* Updated caller to peer

* Addressed review comments

* comment

* add flags

* Addressed review comments

* Addressed review comments

* updates

* updates

* updates

* Addressed review comments

* update
  • Loading branch information
sanket-mundra authored Jan 30, 2024
1 parent b9f458f commit fc4d7c0
Show file tree
Hide file tree
Showing 21 changed files with 561 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class EnrichedSpanConstants {
public static final String GRPC_REQUEST_URL = "grpc.request.url";
public static final String GRPC_REQUEST_ENDPOINT = "grpc.request.endpoint";
public static final String DROP_TRACE_ATTRIBUTE = "drop.trace";
public static final String PEER_SERVICE_NAME = "PEER_SERVICE_NAME";
public static final String PEER_SERVICE_ID = "PEER_SERVICE_ID";

/**
* Returns the constant value for the given Enum.
Expand Down
14 changes: 14 additions & 0 deletions raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,17 @@ data:
{{- if hasKey .Values.rawSpansGrouperConfig "traceEmitPunctuatorFrequency" }}
trace.emit.punctuator.frequency = {{ .Values.rawSpansGrouperConfig.traceEmitPunctuatorFrequency }}
{{- end }}
{{- if hasKey .Values.rawSpansGrouperConfig "peerCorrelation" }}
peer.correlation = {
{{- if hasKey .Values.rawSpansGrouperConfig.peerCorrelation "enabledCustomers" }}
enabled.customers = {{ .Values.rawSpansGrouperConfig.peerCorrelation.enabledCustomers | toJson }}
{{- end }}
{{- if hasKey .Values.rawSpansGrouperConfig.peerCorrelation "enabledAgents" }}
enabled.agents = {{ .Values.rawSpansGrouperConfig.peerCorrelation.enabledAgents | toJson }}
{{- end }}
{{- if hasKey .Values.rawSpansGrouperConfig.peerCorrelation "agentTypeAttribute" }}
agent.type.attribute = {{ .Values.rawSpansGrouperConfig.peerCorrelation.agentTypeAttribute }}
{{- end }}
}
{{- end }}
3 changes: 3 additions & 0 deletions raw-spans-grouper/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ rawSpansGrouperConfig:
groupPartitionerConfigServiceHost: config-service
groupPartitionerConfigServicePort: 50104
traceEmitPunctuatorFrequency: 5s
peerCorrelation:
enabledCustomers: []
enabledAgents: []

span:
groupby:
Expand Down
2 changes: 2 additions & 0 deletions raw-spans-grouper/raw-spans-grouper/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ dependencies {
because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637")
}
implementation(project(":span-normalizer:span-normalizer-api"))
implementation(project(":semantic-convention-utils"))
implementation(project(":hypertrace-trace-enricher:enriched-span-constants"))
implementation(libs.hypertrace.data.model)
implementation(libs.hypertrace.serviceFramework.framework)
implementation(libs.hypertrace.serviceFramework.metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ public class RawSpanGrouperConstants {
public static final String RAW_SPANS_GROUPER_JOB_CONFIG = "raw-spans-grouper-job-config";
public static final String SPAN_STATE_STORE_NAME = "span-data-store";
public static final String TRACE_STATE_STORE = "trace-state-store";
public static final String PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE =
"peer-identity-to-span-metadata-store";
public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer";
public static final String SPANS_PER_TRACE_METRIC = "spans_per_trace";
public static final String TRACE_CREATION_TIME = "trace.creation.time";
public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY =
"dataflow.metriccollection.sampling.percent";
public static final String PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG =
"peer.correlation.agent.type.attribute";
public static final String PEER_CORRELATION_ENABLED_CUSTOMERS =
"peer.correlation.enabled.customers";
public static final String PEER_CORRELATION_ENABLED_AGENTS = "peer.correlation.enabled.agents";
public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count";
public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count";
public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INPUT_TOPIC_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME;
Expand Down Expand Up @@ -31,7 +32,9 @@
import org.hypertrace.core.kafkastreams.framework.partitioner.GroupPartitionerBuilder;
import org.hypertrace.core.kafkastreams.framework.partitioner.KeyHashPartitioner;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.hypertrace.core.spannormalizer.PeerIdentity;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.SpanMetadata;
import org.hypertrace.core.spannormalizer.TraceIdentity;
import org.hypertrace.core.spannormalizer.TraceState;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,6 +89,14 @@ StreamsBuilder buildTopologyWithClock(
Stores.persistentKeyValueStore(SPAN_STATE_STORE_NAME), keySerde, valueSerde)
.withCachingEnabled();

StoreBuilder<KeyValueStore<PeerIdentity, SpanMetadata>>
peerIdentityToSpanMetadataStateStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE),
keySerde,
valueSerde)
.withCachingEnabled();

StoreBuilder<KeyValueStore<Long, List<TraceIdentity>>> traceEmitPunctuatorStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME),
Expand All @@ -95,6 +106,7 @@ StreamsBuilder buildTopologyWithClock(

streamsBuilder.addStateStore(spanStoreBuilder);
streamsBuilder.addStateStore(traceStateStoreBuilder);
streamsBuilder.addStateStore(peerIdentityToSpanMetadataStateStoreBuilder);
streamsBuilder.addStateStore(traceEmitPunctuatorStoreBuilder);

StreamPartitioner<TraceIdentity, StructuredTrace> groupPartitioner =
Expand All @@ -116,7 +128,8 @@ StreamsBuilder buildTopologyWithClock(
Named.as(RawSpansProcessor.class.getSimpleName()),
SPAN_STATE_STORE_NAME,
TRACE_STATE_STORE,
TRACE_EMIT_PUNCTUATOR_STORE_NAME)
TRACE_EMIT_PUNCTUATOR_STORE_NAME,
PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE)
.to(outputTopic, outputTopicProducer);

return streamsBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_ENABLED_AGENTS;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_ENABLED_CUSTOMERS;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME;
Expand All @@ -15,6 +19,7 @@
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER;
import static org.hypertrace.traceenricher.enrichedspan.constants.EnrichedSpanConstants.PEER_SERVICE_NAME;

import com.typesafe.config.Config;
import io.micrometer.core.instrument.Counter;
Expand All @@ -23,9 +28,12 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,14 +44,26 @@
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.Timestamps;
import org.hypertrace.core.datamodel.shared.HexUtils;
import org.hypertrace.core.datamodel.shared.SpanAttributeUtils;
import org.hypertrace.core.datamodel.shared.trace.AttributeValueCreator;
import org.hypertrace.core.datamodel.shared.trace.StructuredTraceBuilder;
import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig;
import org.hypertrace.core.rawspansgrouper.utils.TraceLatencyMeter;
import org.hypertrace.core.rawspansgrouper.validator.PeerIdentityValidator;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.hypertrace.core.spannormalizer.IpIdentity;
import org.hypertrace.core.spannormalizer.PeerIdentity;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.SpanMetadata;
import org.hypertrace.core.spannormalizer.TraceIdentity;
import org.hypertrace.core.spannormalizer.TraceState;
import org.hypertrace.semantic.convention.utils.http.HttpSemanticConventionUtils;
import org.hypertrace.semantic.convention.utils.span.SpanSemanticConventionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,6 +80,7 @@ public class RawSpansProcessor
private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class);
private static final String PROCESSING_LATENCY_TIMER =
"hypertrace.rawspansgrouper.processing.latency";
private static final String ALL = "*";
private static final ConcurrentMap<String, Timer> tenantToSpansGroupingTimer =
new ConcurrentHashMap<>();
// counter for number of spans dropped per tenant
Expand All @@ -69,25 +90,46 @@ public class RawSpansProcessor
// counter for number of truncated traces per tenant
private static final ConcurrentMap<String, Counter> truncatedTracesCounter =
new ConcurrentHashMap<>();
private ProcessorContext<TraceIdentity, StructuredTrace> context;
private final Clock clock;
private KeyValueStore<SpanIdentity, RawSpan> spanStore;
private KeyValueStore<TraceIdentity, TraceState> traceStateStore;
private KeyValueStore<PeerIdentity, SpanMetadata> peerIdentityToSpanMetadataStateStore;
private long groupingWindowTimeoutMs;
private double dataflowSamplingPercent = -1;
private static final Map<String, Long> maxSpanCountMap = new HashMap<>();
private long defaultMaxSpanCountLimit = Long.MAX_VALUE;
private TraceEmitPunctuator traceEmitPunctuator;
private Cancellable traceEmitTasksPunctuatorCancellable;
private String peerCorrelationAgentTypeAttribute;
private List<String> peerCorrelationEnabledCustomers;
private List<String> peerCorrelationEnabledAgents;
private TraceLatencyMeter traceLatencyMeter;

public RawSpansProcessor(Clock clock) {
this.clock = clock;
}

@Override
public void init(ProcessorContext<TraceIdentity, StructuredTrace> context) {
this.context = context;
this.spanStore = context.getStateStore(SPAN_STATE_STORE_NAME);
this.traceStateStore = context.getStateStore(TRACE_STATE_STORE);
this.peerIdentityToSpanMetadataStateStore =
context.getStateStore(PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE);
Config jobConfig = (Config) (context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG));
this.peerCorrelationAgentTypeAttribute =
jobConfig.hasPath(PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG)
? jobConfig.getString(PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG)
: null;
this.peerCorrelationEnabledCustomers =
jobConfig.hasPath(PEER_CORRELATION_ENABLED_CUSTOMERS)
? jobConfig.getStringList(PEER_CORRELATION_ENABLED_CUSTOMERS)
: Collections.emptyList();
this.peerCorrelationEnabledAgents =
jobConfig.hasPath(PEER_CORRELATION_ENABLED_AGENTS)
? jobConfig.getStringList(PEER_CORRELATION_ENABLED_AGENTS)
: Collections.emptyList();
this.groupingWindowTimeoutMs =
jobConfig.getLong(SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY) * 1000;

Expand All @@ -96,15 +138,13 @@ public void init(ProcessorContext<TraceIdentity, StructuredTrace> context) {
&& jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY) <= 100) {
this.dataflowSamplingPercent = jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY);
}
this.traceLatencyMeter = new TraceLatencyMeter(dataflowSamplingPercent);

if (jobConfig.hasPath(INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
Config subConfig = jobConfig.getConfig(INFLIGHT_TRACE_MAX_SPAN_COUNT);
subConfig
.entrySet()
.forEach(
(entry) -> {
maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey()));
});
.forEach(entry -> maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey())));
}

if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
Expand Down Expand Up @@ -151,15 +191,23 @@ public void process(Record<TraceIdentity, RawSpan> record) {
TraceIdentity key = record.key();
RawSpan value = record.value();
TraceState traceState = traceStateStore.get(key);
boolean firstEntry = (traceState == null);

if (shouldDropSpan(key, traceState)) {
return;
}

Event event = value.getEvent();
// spans whose trace is not created by ByPass flow, their trace will be created in below
// function call. Those spans' trace will not be created by TraceEmitPunctuator
if (isPeerServiceNameIdentificationRequired(event)) {
processSpanForPeerServiceNameIdentification(key, value, traceState, currentTimeMs);
return;
}

boolean firstEntry = (traceState == null);
String tenantId = key.getTenantId();
ByteBuffer traceId = value.getTraceId();
ByteBuffer spanId = value.getEvent().getEventId();
ByteBuffer spanId = event.getEventId();
spanStore.put(new SpanIdentity(tenantId, traceId, spanId), value);

if (firstEntry) {
Expand Down Expand Up @@ -196,7 +244,104 @@ public void process(Record<TraceIdentity, RawSpan> record) {
.record(Duration.between(start, Instant.now()).toMillis(), TimeUnit.MILLISECONDS);
// no need to do context.forward. the punctuator will emit the trace once it's eligible to be
// emitted
return;
}

private void processSpanForPeerServiceNameIdentification(
TraceIdentity key, RawSpan value, TraceState traceState, long currentTimeMs) {
Event event = value.getEvent();
String tenantId = key.getTenantId();
ByteBuffer traceId = value.getTraceId();
boolean firstEntry = (traceState == null);
Optional<String> maybeEnvironment = HttpSemanticConventionUtils.getEnvironmentForSpan(event);
if (SpanSemanticConventionUtils.isClientSpanForOCFormat(
event.getAttributes().getAttributeMap())) {
handleClientSpan(tenantId, event, maybeEnvironment.orElse(null));
} else {
handleServerSpan(tenantId, event, maybeEnvironment.orElse(null));
}

// create structured trace and forward
Timestamps timestamps =
traceLatencyMeter.trackEndToEndLatencyTimestamps(
currentTimeMs, firstEntry ? currentTimeMs : traceState.getTraceStartTimestamp());
StructuredTrace trace =
StructuredTraceBuilder.buildStructuredTraceFromRawSpans(
List.of(value), traceId, tenantId, timestamps);
context.forward(new Record<>(key, trace, currentTimeMs), OUTPUT_TOPIC_PRODUCER);
}

// put the peer service identity and corresponding service name in state store
private void handleClientSpan(String tenantId, Event event, String environment) {
Optional<String> maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event);
Optional<String> maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event);
Optional<String> maybePeerPort = HttpSemanticConventionUtils.getPeerPort(event);
String serviceName = event.getServiceName();
PeerIdentity peerIdentity =
PeerIdentity.newBuilder()
.setIpIdentity(
IpIdentity.newBuilder()
.setTenantId(tenantId)
.setEnvironment(environment)
.setHostAddr(maybeHostAddr.orElse(null))
.setPeerAddr(maybePeerAddr.orElse(null))
.setPeerPort(maybePeerPort.orElse(null))
.build())
.build();
if (PeerIdentityValidator.isValid(peerIdentity)) {
this.peerIdentityToSpanMetadataStateStore.put(
peerIdentity,
SpanMetadata.newBuilder()
.setServiceName(serviceName)
.setEventId(event.getEventId())
.build());
}
}

// get the service name for that peer service identity and correlate the current span
private void handleServerSpan(String tenantId, Event event, String environment) {
Optional<String> maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event);
Optional<String> maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event);
Optional<String> maybeHostPort = HttpSemanticConventionUtils.getHostPort(event);
PeerIdentity peerIdentity =
PeerIdentity.newBuilder()
.setIpIdentity(
IpIdentity.newBuilder()
.setTenantId(tenantId)
.setEnvironment(environment)
.setHostAddr(maybePeerAddr.orElse(null))
.setPeerAddr(maybeHostAddr.orElse(null))
.setPeerPort(maybeHostPort.orElse(null))
.build())
.build();
if (PeerIdentityValidator.isValid(peerIdentity)) {
SpanMetadata spanMetadata = this.peerIdentityToSpanMetadataStateStore.get(peerIdentity);
if (Objects.nonNull(spanMetadata)) {
logger.debug(
"Adding {} as: {} from spanId: {} in spanId: {} with service name: {}",
PEER_SERVICE_NAME,
spanMetadata.getServiceName(),
HexUtils.getHex(spanMetadata.getEventId()),
HexUtils.getHex(event.getEventId()),
event.getServiceName());

event
.getEnrichedAttributes()
.getAttributeMap()
.put(PEER_SERVICE_NAME, AttributeValueCreator.create(spanMetadata.getServiceName()));
}
}
}

private boolean isPeerServiceNameIdentificationRequired(Event event) {
if (!this.peerCorrelationEnabledCustomers.contains(event.getCustomerId())
&& !this.peerCorrelationEnabledCustomers.contains(ALL)) {
return false;
}

String agentType =
SpanAttributeUtils.getStringAttributeWithDefault(
event, this.peerCorrelationAgentTypeAttribute, null);
return Objects.nonNull(agentType) && this.peerCorrelationEnabledAgents.contains(agentType);
}

private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
Expand Down
Loading

0 comments on commit fc4d7c0

Please sign in to comment.