diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index e8e613acdbf4b..0825de0af1765 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.LongAdder; @@ -154,7 +155,13 @@ private void registerMetrics() { StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.sum()); StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp() + (config, now) -> { + try { + return openIterators.isEmpty() ? null : openIterators.first().startTimestamp(); + } catch (final NoSuchElementException ignored) { + return null; + } + } ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index a6546c1edb5e1..266e8a7e4bb9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -46,6 +46,7 @@ import java.util.Comparator; import java.util.Map; import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.LongAdder; @@ -124,7 +125,13 @@ private void registerMetrics() { StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.sum()); StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp() + (config, now) -> { + try { + return openIterators.isEmpty() ? null : openIterators.first().startTimestamp(); + } catch (final NoSuchElementException ignored) { + return null; + } + } ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index e59665fb2eb04..2ed95ac9b3b82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -49,6 +49,7 @@ import java.util.Comparator; import java.util.Map; import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.LongAdder; @@ -142,7 +143,13 @@ private void registerMetrics() { StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.sum()); StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp() + (config, now) -> { + try { + return openIterators.isEmpty() ? null : openIterators.first().startTimestamp(); + } catch (final NoSuchElementException ignored) { + return null; + } + } ); }