Skip to content

Commit

Permalink
Set backlog in gauge metric (apache#31137)
Browse files Browse the repository at this point in the history
Co-authored-by: Naireen <naireenhussain@google.com>
  • Loading branch information
Naireen and Naireen authored May 14, 2024
1 parent 7f9264a commit ee170e1
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -224,6 +225,9 @@ public boolean advance() throws IOException {
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString());
rawSizes.update(recordSize);

for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
backlogBytesOfSplit.set(backlogSplit.getValue());
}
return true;

} else { // -- (b)
Expand Down Expand Up @@ -341,6 +345,7 @@ public long getSplitBacklogBytes() {
private final Counter bytesReadBySplit;
private final Gauge backlogBytesOfSplit;
private final Gauge backlogElementsOfSplit;
private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;
private final Counter checkpointMarkCommitsEnqueued =
Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
// Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
Expand Down Expand Up @@ -491,6 +496,10 @@ Instant updateAndGetWatermark() {
lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext());
return lastWatermark;
}

String name() {
return this.topicPartition.toString();
}
}

KafkaUnboundedReader(
Expand Down Expand Up @@ -528,14 +537,16 @@ Instant updateAndGetWatermark() {
prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis()));
}

states.add(
new PartitionState<>(
PartitionState<K, V> state =
new PartitionState<K, V>(
tp,
nextOffset,
source
.getSpec()
.getTimestampPolicyFactory()
.createTimestampPolicy(tp, prevWatermark)));
.createTimestampPolicy(tp, prevWatermark));
states.add(state);
perPartitionBacklogMetrics.put(state.name(), 0L);
}

partitionStates = ImmutableList.copyOf(states);
Expand Down Expand Up @@ -717,6 +728,7 @@ private long getSplitBacklogMessageCount() {
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
perPartitionBacklogMetrics.put(p.name(), pBacklog);
backlogCount += pBacklog;
}

Expand Down

0 comments on commit ee170e1

Please sign in to comment.