diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 054eb502cd8..fed03047cf1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -224,6 +225,9 @@ public boolean advance() throws IOException { METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString()); rawSizes.update(recordSize); + for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { + backlogBytesOfSplit.set(backlogSplit.getValue()); + } return true; } else { // -- (b) @@ -341,6 +345,7 @@ public long getSplitBacklogBytes() { private final Counter bytesReadBySplit; private final Gauge backlogBytesOfSplit; private final Gauge backlogElementsOfSplit; + private HashMap perPartitionBacklogMetrics = new HashMap();; private final Counter checkpointMarkCommitsEnqueued = Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC); // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed). @@ -491,6 +496,10 @@ Instant updateAndGetWatermark() { lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext()); return lastWatermark; } + + String name() { + return this.topicPartition.toString(); + } } KafkaUnboundedReader( @@ -528,14 +537,16 @@ Instant updateAndGetWatermark() { prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis())); } - states.add( - new PartitionState<>( + PartitionState state = + new PartitionState( tp, nextOffset, source .getSpec() .getTimestampPolicyFactory() - .createTimestampPolicy(tp, prevWatermark))); + .createTimestampPolicy(tp, prevWatermark)); + states.add(state); + perPartitionBacklogMetrics.put(state.name(), 0L); } partitionStates = ImmutableList.copyOf(states); @@ -717,6 +728,7 @@ private long getSplitBacklogMessageCount() { if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } + perPartitionBacklogMetrics.put(p.name(), pBacklog); backlogCount += pBacklog; }