diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 46da3a2cec76c..d03107c3756d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -583,7 +583,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, Metrics metrics = null; String clientId = generateClientId(config); Optional clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); - + try { metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time); LogContext logContext = createLogContext(clientId); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index 9b1854811b716..50b11cdc6b1cc 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -74,6 +74,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -167,6 +168,11 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except assertNotNull(mainConsumerInstanceId); LOG.info("Main consumer instance id {}", mainConsumerInstanceId); + final String expectedProcessId = streams.metrics().values().stream() + .filter(metric -> metric.metricName().tags().containsKey("process-id")) + .map(metric -> metric.metricName().tags().get("process-id")) + .findFirst().orElseThrow(); + TestUtils.waitForCondition(() -> !TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).isEmpty(), 30_000, "Never received subscribed metrics"); @@ -185,6 +191,12 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except final List actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId); final List expectedInstanceMetrics = Arrays.asList("org.apache.kafka.stream.alive.stream.threads", "org.apache.kafka.stream.failed.stream.threads"); assertEquals(expectedInstanceMetrics, actualInstanceMetrics); + + TestUtils.waitForCondition(() -> TelemetryPlugin.processId != null, + 30_000, + "Never received the process id"); + + assertEquals(expectedProcessId, TelemetryPlugin.processId); } } @@ -456,6 +468,7 @@ public void unregisterMetricFromSubscription(final KafkaMetric metric) { public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter, ClientTelemetryReceiver { public static final Map> SUBSCRIBED_METRICS = new ConcurrentHashMap<>(); + public static String processId; public TelemetryPlugin() { } @@ -490,10 +503,26 @@ public ClientTelemetryReceiver clientReceiver() { public void exportMetrics(final AuthorizableRequestContext context, final ClientTelemetryPayload payload) { try { final MetricsData data = MetricsData.parseFrom(payload.data()); + + final Optional processIdOption = data.getResourceMetricsList() + .stream() + .flatMap(rm -> rm.getScopeMetricsList().stream()) + .flatMap(sm -> sm.getMetricsList().stream()) + .map(metric -> metric.getGauge()) + .flatMap(gauge -> gauge.getDataPointsList().stream()) + .flatMap(numberDataPoint -> numberDataPoint.getAttributesList().stream()) + .filter(keyValue -> keyValue.getKey().equals("process_id")) + .map(keyValue -> keyValue.getValue().getStringValue()) + .findFirst(); + + processIdOption.ifPresent(pid -> processId = pid); + final Uuid clientId = payload.clientInstanceId(); final List metricNames = data.getResourceMetricsList() .stream() - .map(rm -> rm.getScopeMetricsList().get(0).getMetrics(0).getName()) + .flatMap(rm -> rm.getScopeMetricsList().stream()) + .flatMap(sm -> sm.getMetricsList().stream()) + .map(metric -> metric.getName()) .sorted() .collect(Collectors.toList()); LOG.info("Found metrics {} for clientId={}", metricNames, clientId); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index cc3c3aa910357..584f7be307c6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -974,10 +974,11 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, metrics = createMetrics(applicationConfigs, time, clientId); final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId); metrics.addReporter(reporter); - + streamsMetrics = new StreamsMetricsImpl( metrics, clientId, + processId.toString(), time ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index f5660419b15d0..af693d14bc331 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -87,6 +87,7 @@ public int hashCode() { private final Metrics metrics; private final Map parentSensors; private final String clientId; + private final String processId; private final Version version; private final Deque clientLevelMetrics = new LinkedList<>(); @@ -114,6 +115,7 @@ public int hashCode() { private static final String SENSOR_INTERNAL_LABEL = "internal"; public static final String CLIENT_ID_TAG = "client-id"; + public static final String PROCESS_ID_TAG = "process-id"; public static final String THREAD_ID_TAG = "thread-id"; public static final String TASK_ID_TAG = "task-id"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; @@ -162,10 +164,12 @@ public int hashCode() { public StreamsMetricsImpl(final Metrics metrics, final String clientId, + final String processId, final Time time) { Objects.requireNonNull(metrics, "Metrics cannot be null"); this.metrics = metrics; this.clientId = clientId; + this.processId = processId; version = Version.LATEST; rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time); @@ -269,6 +273,7 @@ private String threadSensorPrefix(final String threadId) { public Map clientLevelTagMap() { final Map tagMap = new LinkedHashMap<>(); tagMap.put(CLIENT_ID_TAG, clientId); + tagMap.put(PROCESS_ID_TAG, processId); return tagMap; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 5d931e9fdf21c..adf7b32c70836 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", time); private final String threadId = Thread.currentThread().getName(); private final Initializer initializer = () -> 0L; private final Aggregator aggregator = (aggKey, value, aggregate) -> aggregate + 1; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 54e2d201c78a2..6a4339a3ed78a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest { private ChangelogReader changeLogReader; private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime()); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime()); private final Map properties = mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234") diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index d7d2971db3299..e6b409fed4c9e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -134,7 +134,7 @@ public void process(final Record record) { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", time), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time), time, "clientId", stateRestoreListener, @@ -173,7 +173,7 @@ public List partitionsFor(final String topic) { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", time), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time), time, "clientId", stateRestoreListener, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java index 4ed68ef81509f..a2e6820f901df 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java @@ -23,6 +23,6 @@ public class MockStreamsMetrics extends StreamsMetricsImpl { public MockStreamsMetrics(final Metrics metrics) { - super(metrics, "test", new MockTime()); + super(metrics, "test", "processId", new MockTime()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 5a786b7174a15..a16315d363b80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -217,7 +217,7 @@ public Set partitions() { public void testMetricsWithBuiltInMetricsVersionLatest() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode node = new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet()); @@ -301,7 +301,7 @@ public void process(final Record record) { public void testTopologyLevelClassCastExceptionDirect() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet()); @@ -321,7 +321,7 @@ private InternalProcessorContext mockInternalProcessorContext() final InternalProcessorContext internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT)); when(internalProcessorContext.taskId()).thenReturn(TASK_ID); - when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime())); + when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime())); when(internalProcessorContext.topic()).thenReturn(TOPIC); when(internalProcessorContext.partition()).thenReturn(PARTITION); when(internalProcessorContext.offset()).thenReturn(OFFSET); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index a55fe098608c1..1e7c12167c7c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -72,7 +72,7 @@ public class RecordQueueTest { private final Metrics metrics = new Metrics(); private final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "mock", new MockTime()); + new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime()); @SuppressWarnings("rawtypes") final InternalMockProcessorContext context = new InternalMockProcessorContext<>( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 90d883590423f..e0fa79fd450a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -97,7 +97,7 @@ public String deserialize(final String topic, final byte[] data) { public void shouldExposeProcessMetrics() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 320d6c44510f9..e953a61fc1f3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -113,7 +113,7 @@ public class StandbyTaskTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", time); private File baseDir; private StreamsConfig config; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index ae1809c7a91f8..8e7ae80af3528 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -957,7 +957,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology); Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig()); - directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", time), new LogContext("test")); + directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test")); return store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 9e5ea0b909d6a..36471dcd02b1e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -2487,7 +2487,7 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { streamsMetrics, null ); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "processId", time); // The processor topology is missing the topics final ProcessorTopology topology = withSources(emptyList(), mkMap()); @@ -3043,7 +3043,7 @@ private StreamTask createSingleSourceStateless(final StreamsConfig config) { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", time), + new StreamsMetricsImpl(metrics, "test", "processId", time), stateDirectory, cache, time, @@ -3080,7 +3080,7 @@ private StreamTask createStatelessTask(final StreamsConfig config) { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", time), + new StreamsMetricsImpl(metrics, "test", "processId", time), stateDirectory, cache, time, @@ -3116,7 +3116,7 @@ private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode tasks) { } final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); @@ -1464,7 +1467,7 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean stateUpda final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1484,7 +1487,7 @@ public void shouldOnlyShutdownOnce(final boolean stateUpdaterEnabled, final bool final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1885,6 +1888,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, + PROCESS_ID.toString(), mockTime ); @@ -2582,7 +2586,7 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean state doThrow(new TaskMigratedException("Task lost exception", new RuntimeException())).when(taskManager).handleLostAll(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -2612,7 +2616,7 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean sta doThrow(new TaskMigratedException("Revocation non fatal exception", new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -2644,7 +2648,7 @@ public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolea when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2702,7 +2706,7 @@ public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHan doThrow(new TimeoutException()).when(taskManager).handleCorruption(corruptedTasks); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2769,7 +2773,7 @@ public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final doNothing().when(taskManager).handleLostAll(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2832,7 +2836,7 @@ public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveT doNothing().when(consumer).enforceRebalance("Active tasks corrupted"); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2892,7 +2896,7 @@ public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInac when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3076,7 +3080,7 @@ public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata); @@ -3101,7 +3105,7 @@ public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3157,7 +3161,7 @@ public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, fi when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3549,7 +3553,7 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { "", taskManager, null, - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime), topologyMetadata, "thread-id", new LogContext(), @@ -3599,7 +3603,7 @@ private Collection createStandbyTask(final StreamsConfig config) { final LogContext logContext = new LogContext("test"); final Logger log = logContext.logger(StreamThreadTest.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( new TopologyMetadata(internalTopologyBuilder, config), config, @@ -3658,7 +3662,7 @@ private StreamThread buildStreamThread(final Consumer consumer, final StreamsConfig config, final TopologyMetadata topologyMetadata) { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); return new StreamThread( mockTime, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 218948cafc0e9..3f80391c48e97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -54,6 +54,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP; @@ -93,6 +94,7 @@ public class StreamsMetricsImplTest { private static final String SENSOR_NAME_2 = "sensor2"; private static final String INTERNAL_PREFIX = "internal"; private static final String CLIENT_ID = "test-client"; + private static final String PROCESS_ID = "test-process"; private static final String THREAD_ID1 = "test-thread-1"; private static final String TASK_ID1 = "test-task-1"; private static final String TASK_ID2 = "test-task-2"; @@ -131,13 +133,13 @@ public class StreamsMetricsImplTest { private final String metricNamePrefix = "metric"; private final String group = "group"; private final Map tags = mkMap(mkEntry("tag", "value")); - private final Map clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID)); + private final Map clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), mkEntry(PROCESS_ID_TAG, PROCESS_ID)); private final MetricName metricName1 = new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags); private final MetricName metricName2 = new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags); private final MockTime time = new MockTime(0); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) { final StringBuffer message = new StringBuffer(); @@ -252,7 +254,7 @@ public void shouldGetNewThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -264,7 +266,7 @@ public void shouldGetExistingThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -276,7 +278,7 @@ public void shouldGetNewTaskLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -293,7 +295,7 @@ public void shouldGetExistingTaskLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -310,7 +312,7 @@ public void shouldGetNewTopicLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -329,7 +331,7 @@ public void shouldGetExistingTopicLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -348,7 +350,7 @@ public void shouldGetNewStoreLevelSensorIfNoneExists() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; final ArgumentCaptor sensorKeys = setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -366,7 +368,7 @@ public void shouldGetExistingStoreLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -382,7 +384,7 @@ public void shouldGetExistingStoreLevelSensor() { public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL); @@ -394,7 +396,7 @@ public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -406,7 +408,7 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -418,7 +420,7 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); final Thread otherThread = @@ -433,7 +435,7 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws I public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -457,7 +459,7 @@ public void shouldAddNewStoreLevelMutableMetric() { .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); when(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -489,7 +491,7 @@ public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() { when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -537,7 +539,7 @@ public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws @Test public void shouldRemoveStateStoreLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final MetricName metricName1 = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricName metricName2 = @@ -560,7 +562,7 @@ public void shouldGetNewNodeLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -578,7 +580,7 @@ public void shouldGetExistingNodeLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -597,7 +599,7 @@ public void shouldGetNewCacheLevelSensor() { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, @@ -616,7 +618,7 @@ public void shouldGetExistingCacheLevelSensor() { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, TASK_ID1, @@ -633,7 +635,7 @@ public void shouldGetNewClientLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -645,7 +647,7 @@ public void shouldGetExistingClientLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -662,7 +664,7 @@ public void shouldAddClientLevelImmutableMetric() { when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) .thenReturn(metricName1); doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value); } @@ -676,7 +678,7 @@ public void shouldAddClientLevelMutableMetric() { when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) .thenReturn(metricName1); doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider); } @@ -697,7 +699,7 @@ private void setupRemoveSensorsTest(final Metrics metrics, @Test public void shouldRemoveClientLevelMetricsAndSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); final ArgumentCaptor sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0)); @@ -710,7 +712,7 @@ public void shouldRemoveClientLevelMetricsAndSensors() { @Test public void shouldRemoveThreadLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); addSensorsOnAllLevels(metrics, streamsMetrics); setupRemoveSensorsTest(metrics, THREAD_ID1); @@ -719,7 +721,7 @@ public void shouldRemoveThreadLevelSensors() { @Test public void testNullMetrics() { - assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", time)); + assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", PROCESS_ID, time)); } @Test @@ -752,7 +754,7 @@ public void testRemoveSensor() { @Test public void testMultiLevelSensorRemoval() { final Metrics registry = new Metrics(); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, PROCESS_ID, time); for (final MetricName defaultMetric : registry.metrics().keySet()) { registry.removeMetric(defaultMetric); } @@ -858,7 +860,7 @@ public void testTotalMetricDoesntDecrease() { final MockTime time = new MockTime(1); final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS); final Metrics metrics = new Metrics(config, time); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", PROCESS_ID, time); final String scope = "scope"; final String entity = "entity"; @@ -892,7 +894,7 @@ public void testTotalMetricDoesntDecrease() { @Test public void shouldAddLatencyRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); shouldAddCustomSensor( streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -907,7 +909,7 @@ public void shouldAddLatencyRateTotalSensor() { @Test public void shouldAddRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time); shouldAddCustomSensor( streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -1033,8 +1035,9 @@ public void shouldThrowIfRateTotalSensorIsAddedWithOddTags() { public void shouldGetClientLevelTagMap() { final Map tagMap = streamsMetrics.clientLevelTagMap(); - assertThat(tagMap.size(), equalTo(1)); + assertThat(tagMap.size(), equalTo(2)); assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG), equalTo(CLIENT_ID)); + assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG), equalTo(PROCESS_ID)); } @Test @@ -1042,7 +1045,7 @@ public void shouldGetStoreLevelTagMap() { final String taskName = "test-task"; final String storeType = "remote-window"; final String storeName = "window-keeper"; - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); final Map tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName); @@ -1057,7 +1060,7 @@ public void shouldGetStoreLevelTagMap() { @Test public void shouldGetCacheLevelTagMap() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, THREAD_ID1, time); + new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); final String taskName = "taskName"; final String storeName = "storeName"; @@ -1074,7 +1077,7 @@ public void shouldGetCacheLevelTagMap() { @Test public void shouldGetThreadLevelTagMap() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); final Map tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID1); @@ -1207,7 +1210,7 @@ public void shouldAddMinAndMaxMetricsToSensor() { @Test public void shouldReturnMetricsVersionCurrent() { assertThat( - new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(), + new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time).version(), equalTo(Version.LATEST) ); } @@ -1266,7 +1269,7 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() { public void shouldAddThreadLevelMutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); streamsMetrics.addThreadLevelMutableMetric( "foobar", @@ -1288,7 +1291,7 @@ public void shouldAddThreadLevelMutableMetric() { public void shouldCleanupThreadLevelMutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); streamsMetrics.addThreadLevelMutableMetric( "foobar", "test metric", @@ -1310,7 +1313,7 @@ public void shouldCleanupThreadLevelMutableMetric() { public void shouldAddThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", @@ -1332,7 +1335,7 @@ public void shouldAddThreadLevelImmutableMetric() { public void shouldCleanupThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", "test metric", diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 31bea36e8c66c..f7578112da8f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -1415,7 +1415,7 @@ public void shouldRestoreRecordsAndConsistencyVectorSingleTopic() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1451,7 +1451,7 @@ public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1490,7 +1490,7 @@ public void shouldHandleTombstoneRecords() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1531,7 +1531,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 52907521289bb..c659fa08417d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -572,7 +572,7 @@ public void shouldRestoreRecordsAndConsistencyVectorSingleTopic(final SegmentedB dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -612,7 +612,7 @@ public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics(final Segment dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -654,7 +654,7 @@ public void shouldHandleTombstoneRecords(final SegmentedBytesStore.KeySchema sch dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -698,7 +698,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders(final SegmentedBytesStor dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 35efe5891a93d..34259a60e77f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -98,7 +98,7 @@ private InternalMockProcessorContext mockContext() { TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), streamsConfig, () -> collector, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index e2f953de2f891..7b714e781d2af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -113,7 +113,7 @@ public void before() { when(mockContext.applicationId()).thenReturn("appId"); when(mockContext.metrics()) .thenReturn( - new StreamsMetricsImpl(new Metrics(), "threadName", new MockTime()) + new StreamsMetricsImpl(new Metrics(), "threadName", "processId", new MockTime()) ); when(mockContext.taskId()).thenReturn(new TaskId(0, 0)); when(mockContext.appConfigs()).thenReturn(CONFIGS); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java index 108e6e631b090..119bda69c9f16 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java @@ -57,7 +57,7 @@ public class KeyValueSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 7e7cf77075a24..2509702dcdf1f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -124,7 +124,7 @@ private void setUp() { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()).thenReturn( - new StreamsMetricsImpl(metrics, "test", mockTime) + new StreamsMetricsImpl(metrics, "test", "processId", mockTime) ); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 92930eca91d5b..08a8b1ceaf4cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -127,7 +127,7 @@ public void setUp() { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(innerStore.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 54ee48fd4bbbe..bb33ef5553202 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -128,7 +128,7 @@ private void setUp() { setUpWithoutContext(); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(inner.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index 0682a75c35b18..2826054f3d885 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -78,7 +78,7 @@ public class MeteredTimestampedWindowStoreTest { public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", new MockTime()); + new StreamsMetricsImpl(metrics, "test", "processId", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), @@ -106,7 +106,7 @@ public void setUp() { public void setUpWithoutContextName() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", new MockTime()); + new StreamsMetricsImpl(metrics, "test", "processId", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 29d43bcca55ae..5511a39fe2c3d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -111,7 +111,7 @@ public class MeteredVersionedKeyValueStoreTest { @BeforeEach public void setUp() { when(inner.name()).thenReturn(STORE_NAME); - when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); + when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", mockTime)); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.taskId()).thenReturn(TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index dd1297c52c99d..b4aeccd0c8393 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -117,7 +117,7 @@ public class MeteredWindowStoreTest { @BeforeEach public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", new MockTime()); + new StreamsMetricsImpl(metrics, "test", "processId", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index f6a7efefdc12c..2c98c3427d0d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -919,7 +919,7 @@ public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRock final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", time); + new StreamsMetricsImpl(metrics, "test-application", "processId", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -952,7 +952,7 @@ public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRock final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", time); + new StreamsMetricsImpl(metrics, "test-application", "processId", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -984,7 +984,7 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() { final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", time); + new StreamsMetricsImpl(metrics, "test-application", "processId", time); final Properties props = StreamsTestUtils.getStreamsConfig(); context = mock(InternalMockProcessorContext.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 47456fc03eef9..8fbde2f78e172 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -65,7 +65,7 @@ public void setUp() { when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); final Metrics metrics = new Metrics(); offset = 0; - streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new MockTime()); + streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java index 9ebac835b43f4..82a76ba13a605 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java @@ -57,7 +57,7 @@ public class TimestampedSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java index 113de5959a4ed..75c01cef3c953 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java @@ -202,7 +202,7 @@ public void shouldGetPinnedUsageOfBlockCacheWithSingleCache() throws Exception { private void runAndVerifySumOfProperties(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -219,7 +219,7 @@ private void runAndVerifySumOfProperties(final String propertyName) throws Excep private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -236,7 +236,7 @@ private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String proper private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index 44c7373c45bb7..7136ee66b2e2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -178,7 +178,7 @@ public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetric assertThrows( IllegalStateException.class, () -> recorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()), TASK_ID1 ) ); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 4ba7e565c6d9e..228df8d63a1ac 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -89,7 +89,7 @@ public InternalMockProcessorContext() { this(null, null, null, - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null, @@ -106,6 +106,7 @@ public InternalMockProcessorContext(final File stateDir, new StreamsMetricsImpl( new Metrics(), "mock", + "processId", new MockTime() ), config, @@ -138,6 +139,7 @@ public InternalMockProcessorContext(final File stateDir, new StreamsMetricsImpl( new Metrics(), "mock", + "processId", new MockTime() ), config, @@ -155,7 +157,7 @@ public InternalMockProcessorContext(final File stateDir, stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), config, null, null, @@ -175,7 +177,7 @@ public InternalMockProcessorContext(final StateSerdes serdes, null, serdes.keySerde(), serdes.valueSerde(), - new StreamsMetricsImpl(metrics, "mock", new MockTime()), + new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, null, @@ -192,7 +194,7 @@ public InternalMockProcessorContext(final File stateDir, stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", "processId", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, cache, diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 0864880579255..271824ceeb574 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -380,9 +380,10 @@ private StreamsMetricsImpl setupMetrics(final StreamsConfig streamsConfig) { metrics = new Metrics(metricConfig, mockWallClockTime); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( - metrics, - "test-client", - mockWallClockTime + metrics, + "test-client", + "processId", + mockWallClockTime ); TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), streamsMetrics); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 8399233a4e48a..aa4fa139d5f0e 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -240,9 +240,10 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG); final String threadId = Thread.currentThread().getName(); this.metrics = new StreamsMetricsImpl( - new Metrics(metricConfig), - threadId, - Time.SYSTEM + new Metrics(metricConfig), + threadId, + "processId", + Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java index 146359bf25e6f..5a506163bb211 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java @@ -255,6 +255,7 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final metrics = new StreamsMetricsImpl( new Metrics(metricConfig), threadId, + "processId", Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index 236ca53f79186..bb2800dcc8681 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -232,6 +232,7 @@ public void process(final String key, final Long value) { when(mockInternalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl( new Metrics(new MetricConfig()), Thread.currentThread().getName(), + "processId", Time.SYSTEM )); when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1));