Skip to content

Commit

Permalink
KAFKA-17248 - KIP 1076 Add process-id to get carried to telemetry res…
Browse files Browse the repository at this point in the history
…ults (apache#17630)

This PR adds the processId to Kafka Streams client instance metrics

Reviewers: Matthias Sax <mjsax@apache.org>
  • Loading branch information
bbejeck authored Nov 7, 2024
1 parent a0d4cbe commit c69a6b0
Show file tree
Hide file tree
Showing 37 changed files with 164 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config,
Metrics metrics = null;
String clientId = generateClientId(config);
Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);

try {
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
LogContext logContext = createLogContext(clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -167,6 +168,11 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
assertNotNull(mainConsumerInstanceId);
LOG.info("Main consumer instance id {}", mainConsumerInstanceId);

final String expectedProcessId = streams.metrics().values().stream()
.filter(metric -> metric.metricName().tags().containsKey("process-id"))
.map(metric -> metric.metricName().tags().get("process-id"))
.findFirst().orElseThrow();

TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).isEmpty(),
30_000,
"Never received subscribed metrics");
Expand All @@ -185,6 +191,12 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
final List<String> actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
final List<String> expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads");
assertEquals(expectedInstanceMetrics, actualInstanceMetrics);

TestUtils.waitForCondition(() -> TelemetryPlugin.processId != null,
30_000,
"Never received the process id");

assertEquals(expectedProcessId, TelemetryPlugin.processId);
}
}

Expand Down Expand Up @@ -456,6 +468,7 @@ public void unregisterMetricFromSubscription(final KafkaMetric metric) {
public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter, ClientTelemetryReceiver {

public static final Map<Uuid, List<String>> SUBSCRIBED_METRICS = new ConcurrentHashMap<>();
public static String processId;
public TelemetryPlugin() {
}

Expand Down Expand Up @@ -490,10 +503,26 @@ public ClientTelemetryReceiver clientReceiver() {
public void exportMetrics(final AuthorizableRequestContext context, final ClientTelemetryPayload payload) {
try {
final MetricsData data = MetricsData.parseFrom(payload.data());

final Optional<String> processIdOption = data.getResourceMetricsList()
.stream()
.flatMap(rm -> rm.getScopeMetricsList().stream())
.flatMap(sm -> sm.getMetricsList().stream())
.map(metric -> metric.getGauge())
.flatMap(gauge -> gauge.getDataPointsList().stream())
.flatMap(numberDataPoint -> numberDataPoint.getAttributesList().stream())
.filter(keyValue -> keyValue.getKey().equals("process_id"))
.map(keyValue -> keyValue.getValue().getStringValue())
.findFirst();

processIdOption.ifPresent(pid -> processId = pid);

final Uuid clientId = payload.clientInstanceId();
final List<String> metricNames = data.getResourceMetricsList()
.stream()
.map(rm -> rm.getScopeMetricsList().get(0).getMetrics(0).getName())
.flatMap(rm -> rm.getScopeMetricsList().stream())
.flatMap(sm -> sm.getMetricsList().stream())
.map(metric -> metric.getName())
.sorted()
.collect(Collectors.toList());
LOG.info("Found metrics {} for clientId={}", metricNames, clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,10 +974,11 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
metrics = createMetrics(applicationConfigs, time, clientId);
final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId);
metrics.addReporter(reporter);

streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
processId.toString(),
time
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public int hashCode() {
private final Metrics metrics;
private final Map<Sensor, Sensor> parentSensors;
private final String clientId;
private final String processId;

private final Version version;
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
Expand Down Expand Up @@ -114,6 +115,7 @@ public int hashCode() {
private static final String SENSOR_INTERNAL_LABEL = "internal";

public static final String CLIENT_ID_TAG = "client-id";
public static final String PROCESS_ID_TAG = "process-id";
public static final String THREAD_ID_TAG = "thread-id";
public static final String TASK_ID_TAG = "task-id";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
Expand Down Expand Up @@ -162,10 +164,12 @@ public int hashCode() {

public StreamsMetricsImpl(final Metrics metrics,
final String clientId,
final String processId,
final Time time) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.metrics = metrics;
this.clientId = clientId;
this.processId = processId;
version = Version.LATEST;
rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);

Expand Down Expand Up @@ -269,6 +273,7 @@ private String threadSensorPrefix(final String threadId) {
public Map<String, String> clientLevelTagMap() {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(CLIENT_ID_TAG, clientId);
tagMap.put(PROCESS_ID_TAG, processId);
return tagMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {

private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", time);
private final String threadId = Thread.currentThread().getName();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
private ChangelogReader changeLogReader;

private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
private final Map<String, Object> properties = mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void process(final Record<Object, Object> record) {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", time),
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
time,
"clientId",
stateRestoreListener,
Expand Down Expand Up @@ -173,7 +173,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", time),
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
time,
"clientId",
stateRestoreListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
public class MockStreamsMetrics extends StreamsMetricsImpl {

public MockStreamsMetrics(final Metrics metrics) {
super(metrics, "test", new MockTime());
super(metrics, "test", "processId", new MockTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public Set<TopicPartition> partitions() {
public void testMetricsWithBuiltInMetricsVersionLatest() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
Expand Down Expand Up @@ -301,7 +301,7 @@ public void process(final Record<Object, Object> record) {
public void testTopologyLevelClassCastExceptionDirect() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
Expand All @@ -321,7 +321,7 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));

when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
when(internalProcessorContext.topic()).thenReturn(TOPIC);
when(internalProcessorContext.partition()).thenReturn(PARTITION);
when(internalProcessorContext.offset()).thenReturn(OFFSET);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class RecordQueueTest {

private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "mock", new MockTime());
new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());

@SuppressWarnings("rawtypes")
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public String deserialize(final String topic, final byte[] data) {
public void shouldExposeProcessMetrics() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class StandbyTaskTest {

private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", time);

private File baseDir;
private StreamsConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());

directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", time), new LogContext("test"));
directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test"));

return store;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2487,7 +2487,7 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
streamsMetrics,
null
);
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", time);
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "processId", time);

// The processor topology is missing the topics
final ProcessorTopology topology = withSources(emptyList(), mkMap());
Expand Down Expand Up @@ -3043,7 +3043,7 @@ private StreamTask createSingleSourceStateless(final StreamsConfig config) {
topology,
consumer,
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
new StreamsMetricsImpl(metrics, "test", time),
new StreamsMetricsImpl(metrics, "test", "processId", time),
stateDirectory,
cache,
time,
Expand Down Expand Up @@ -3080,7 +3080,7 @@ private StreamTask createStatelessTask(final StreamsConfig config) {
topology,
consumer,
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
new StreamsMetricsImpl(metrics, "test", time),
new StreamsMetricsImpl(metrics, "test", "processId", time),
stateDirectory,
cache,
time,
Expand Down Expand Up @@ -3116,7 +3116,7 @@ private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<In
topology,
consumer,
new TopologyConfig(null, config, new Properties()).getTaskConfig(),
new StreamsMetricsImpl(metrics, "test", time),
new StreamsMetricsImpl(metrics, "test", "processId", time),
stateDirectory,
cache,
time,
Expand Down
Loading

0 comments on commit c69a6b0

Please sign in to comment.