diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 41702835d4680..27dcdad3ccb9e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataRecoveryStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -506,8 +507,8 @@ public class ConsumerConfig extends AbstractConfig { ENABLE_METRICS_PUSH_DOC) .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, - OffsetResetStrategy.LATEST.toString(), - in(Utils.enumOptions(OffsetResetStrategy.class)), + AutoOffsetResetStrategy.LATEST.name(), + new AutoOffsetResetStrategy.Validator(), Importance.MEDIUM, AUTO_OFFSET_RESET_DOC) .define(CHECK_CRCS_CONFIG, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index a5143749b8943..f60c97ad6fe50 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -79,7 +80,23 @@ public class MockConsumer implements Consumer { private final List addedMetrics = new ArrayList<>(); + /** + * @deprecated Since 4.0. Use {@link #MockConsumer(String)}. + */ + @Deprecated public MockConsumer(OffsetResetStrategy offsetResetStrategy) { + this(AutoOffsetResetStrategy.fromString(offsetResetStrategy.toString())); + } + + /** + * A mock consumer is instantiated by providing ConsumerConfig.AUTO_OFFSET_RESET_CONFIG value as the input. + * @param offsetResetStrategy the offset reset strategy to use + */ + public MockConsumer(String offsetResetStrategy) { + this(AutoOffsetResetStrategy.fromString(offsetResetStrategy)); + } + + private MockConsumer(AutoOffsetResetStrategy offsetResetStrategy) { this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy); this.partitions = new HashMap<>(); this.records = new HashMap<>(); @@ -389,7 +406,7 @@ public synchronized long position(TopicPartition partition, final Duration timeo @Override public synchronized void seekToBeginning(Collection partitions) { ensureNotClosed(); - subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.EARLIEST); + subscriptions.requestOffsetReset(partitions, AutoOffsetResetStrategy.EARLIEST); } public synchronized void updateBeginningOffsets(Map newOffsets) { @@ -399,7 +416,7 @@ public synchronized void updateBeginningOffsets(Map newOff @Override public synchronized void seekToEnd(Collection partitions) { ensureNotClosed(); - subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(partitions, AutoOffsetResetStrategy.LATEST); } public synchronized void updateEndOffsets(final Map newOffsets) { @@ -573,13 +590,13 @@ private void updateFetchPosition(TopicPartition tp) { } private void resetOffsetPosition(TopicPartition tp) { - OffsetResetStrategy strategy = subscriptions.resetStrategy(tp); + AutoOffsetResetStrategy strategy = subscriptions.resetStrategy(tp); Long offset; - if (strategy == OffsetResetStrategy.EARLIEST) { + if (strategy == AutoOffsetResetStrategy.EARLIEST) { offset = beginningOffsets.get(tp); if (offset == null) throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning"); - } else if (strategy == OffsetResetStrategy.LATEST) { + } else if (strategy == AutoOffsetResetStrategy.LATEST) { offset = endOffsets.get(tp); if (offset == null) throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java index bf8404f45597d..7c04cc30abdf8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -54,7 +55,7 @@ public class MockShareConsumer implements ShareConsumer { private Uuid clientInstanceId; public MockShareConsumer() { - this.subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + this.subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); this.records = new HashMap<>(); this.closed = false; this.wakeup = new AtomicBoolean(false); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java index 8b2297c96865e..8c84863f60ff8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java @@ -18,6 +18,10 @@ import java.util.Locale; +/** + * @deprecated Since 4.0. Use {@link org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy instead.} + */ +@Deprecated public enum OffsetResetStrategy { LATEST, EARLIEST, NONE; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 646087ee58d9b..18615babaa64a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -33,7 +33,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; @@ -864,15 +863,15 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) @Override public void seekToBeginning(Collection partitions) { - seek(partitions, OffsetResetStrategy.EARLIEST); + seek(partitions, AutoOffsetResetStrategy.EARLIEST); } @Override public void seekToEnd(Collection partitions) { - seek(partitions, OffsetResetStrategy.LATEST); + seek(partitions, AutoOffsetResetStrategy.LATEST); } - private void seek(Collection partitions, OffsetResetStrategy offsetResetStrategy) { + private void seek(Collection partitions, AutoOffsetResetStrategy offsetResetStrategy) { if (partitions == null) throw new IllegalArgumentException("Partitions collection cannot be null"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java new file mode 100644 index 0000000000000..a94eb4585a320 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; + +public class AutoOffsetResetStrategy { + private enum StrategyType { + LATEST, EARLIEST, NONE; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + } + + public static final AutoOffsetResetStrategy EARLIEST = new AutoOffsetResetStrategy(StrategyType.EARLIEST); + public static final AutoOffsetResetStrategy LATEST = new AutoOffsetResetStrategy(StrategyType.LATEST); + public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(StrategyType.NONE); + + private final StrategyType type; + + private AutoOffsetResetStrategy(StrategyType type) { + this.type = type; + } + + public static boolean isValid(String offsetStrategy) { + return Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy); + } + + public static AutoOffsetResetStrategy fromString(String offsetStrategy) { + if (offsetStrategy == null || !isValid(offsetStrategy)) { + throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); + } + StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT)); + switch (type) { + case EARLIEST: + return EARLIEST; + case LATEST: + return LATEST; + case NONE: + return NONE; + default: + throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); + } + } + + /** + * Returns the name of the offset reset strategy. + */ + public String name() { + return type.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o; + return Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hashCode(type); + } + + @Override + public String toString() { + return "AutoOffsetResetStrategy{" + + "type='" + type + '\'' + + '}'; + } + + public static class Validator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strategy = (String) value; + if (!AutoOffsetResetStrategy.isValid(strategy)) { + throw new ConfigException(name, value, "Invalid value " + strategy + " for configuration " + + name + ": the value must be either 'earliest', 'latest', or 'none'."); + } + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index f5a672631ff56..28b82f3b3adc3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics; import org.apache.kafka.common.Cluster; @@ -828,7 +827,7 @@ public void seekToBeginning(Collection partitions) { acquireAndEnsureOpen(); try { Collection parts = partitions.isEmpty() ? this.subscriptions.assignedPartitions() : partitions; - subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST); + subscriptions.requestOffsetReset(parts, AutoOffsetResetStrategy.EARLIEST); } finally { release(); } @@ -842,7 +841,7 @@ public void seekToEnd(Collection partitions) { acquireAndEnsureOpen(); try { Collection parts = partitions.isEmpty() ? this.subscriptions.assignedPartitions() : partitions; - subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(parts, AutoOffsetResetStrategy.LATEST); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java index 113b8e0b9d6df..d803b66780bfb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -130,8 +129,8 @@ public static IsolationLevel configuredIsolationLevel(ConsumerConfig config) { } public static SubscriptionState createSubscriptionState(ConsumerConfig config, LogContext logContext) { - String s = config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT); - OffsetResetStrategy strategy = OffsetResetStrategy.valueOf(s); + String s = config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + AutoOffsetResetStrategy strategy = AutoOffsetResetStrategy.fromString(s); return new SubscriptionState(logContext, strategy); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java index 89940087611f4..2e9d48ebda5d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -285,10 +284,10 @@ static Map buildOffsetsForTimeIntern } private long offsetResetStrategyTimestamp(final TopicPartition partition) { - OffsetResetStrategy strategy = subscriptionState.resetStrategy(partition); - if (strategy == OffsetResetStrategy.EARLIEST) + AutoOffsetResetStrategy strategy = subscriptionState.resetStrategy(partition); + if (strategy == AutoOffsetResetStrategy.EARLIEST) return ListOffsetsRequest.EARLIEST_TIMESTAMP; - else if (strategy == OffsetResetStrategy.LATEST) + else if (strategy == AutoOffsetResetStrategy.LATEST) return ListOffsetsRequest.LATEST_TIMESTAMP; else throw new NoOffsetForPartitionException(partition); @@ -320,11 +319,11 @@ void updateSubscriptionState(Map assignment; /* Default offset reset strategy */ - private final OffsetResetStrategy defaultResetStrategy; + private final AutoOffsetResetStrategy defaultResetStrategy; /* User-provided listener to be invoked when assignment changes */ private Optional rebalanceListener = Optional.empty(); @@ -132,7 +131,7 @@ public synchronized String prettyString() { } } - public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy) { + public SubscriptionState(LogContext logContext, AutoOffsetResetStrategy defaultResetStrategy) { this.log = logContext.logger(this.getClass()); this.defaultResetStrategy = defaultResetStrategy; this.subscription = new TreeSet<>(); // use a sorted set for better logging @@ -393,7 +392,7 @@ public void seekUnvalidated(TopicPartition tp, FetchPosition position) { assignedState(tp).seekUnvalidated(position); } - synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position, OffsetResetStrategy requestedResetStrategy) { + synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position, AutoOffsetResetStrategy requestedResetStrategy) { TopicPartitionState state = assignedStateOrNull(tp); if (state == null) { log.debug("Skipping reset of partition {} since it is no longer assigned", tp); @@ -704,11 +703,11 @@ public synchronized Map allConsumed() { return allConsumed; } - public synchronized void requestOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) { + public synchronized void requestOffsetReset(TopicPartition partition, AutoOffsetResetStrategy offsetResetStrategy) { assignedState(partition).reset(offsetResetStrategy); } - public synchronized void requestOffsetReset(Collection partitions, OffsetResetStrategy offsetResetStrategy) { + public synchronized void requestOffsetReset(Collection partitions, AutoOffsetResetStrategy offsetResetStrategy) { partitions.forEach(tp -> { log.info("Seeking to {} offset of partition {}", offsetResetStrategy, tp); assignedState(tp).reset(offsetResetStrategy); @@ -734,14 +733,14 @@ synchronized void setNextAllowedRetry(Set partitions, long nextA } boolean hasDefaultOffsetResetPolicy() { - return defaultResetStrategy != OffsetResetStrategy.NONE; + return defaultResetStrategy != AutoOffsetResetStrategy.NONE; } public synchronized boolean isOffsetResetNeeded(TopicPartition partition) { return assignedState(partition).awaitingReset(); } - public synchronized OffsetResetStrategy resetStrategy(TopicPartition partition) { + public synchronized AutoOffsetResetStrategy resetStrategy(TopicPartition partition) { return assignedState(partition).resetStrategy(); } @@ -782,7 +781,7 @@ public synchronized void resetInitializingPositions(Predicate in final Set partitionsWithNoOffsets = new HashSet<>(); assignment.forEach((tp, partitionState) -> { if (partitionState.shouldInitialize() && initPartitionsToInclude.test(tp)) { - if (defaultResetStrategy == OffsetResetStrategy.NONE) + if (defaultResetStrategy == AutoOffsetResetStrategy.NONE) partitionsWithNoOffsets.add(tp); else requestOffsetReset(tp); @@ -897,7 +896,7 @@ private static class TopicPartitionState { private boolean paused; // whether this partition has been paused by the user private boolean pendingRevocation; private boolean pendingOnAssignedCallback; - private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting + private AutoOffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting private Long nextRetryTimeMs; private Integer preferredReadReplica; private Long preferredReadReplicaExpireTimeMs; @@ -966,7 +965,7 @@ private Optional clearPreferredReadReplica() { } } - private void reset(OffsetResetStrategy strategy) { + private void reset(AutoOffsetResetStrategy strategy) { transitionState(FetchStates.AWAIT_RESET, () -> { this.resetStrategy = strategy; this.nextRetryTimeMs = null; @@ -1137,7 +1136,7 @@ private void lastStableOffset(Long lastStableOffset) { this.endOffsetRequested = false; } - private OffsetResetStrategy resetStrategy() { + private AutoOffsetResetStrategy resetStrategy() { return resetStrategy; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java index 145b8643d356f..c7b4ff9641db1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetOffsetEvent.java @@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import java.time.Duration; @@ -37,9 +37,9 @@ public class ResetOffsetEvent extends CompletableApplicationEvent { private final Collection topicPartitions; - private final OffsetResetStrategy offsetResetStrategy; + private final AutoOffsetResetStrategy offsetResetStrategy; - public ResetOffsetEvent(Collection topicPartitions, OffsetResetStrategy offsetResetStrategy, long deadline) { + public ResetOffsetEvent(Collection topicPartitions, AutoOffsetResetStrategy offsetResetStrategy, long deadline) { super(Type.RESET_OFFSET, deadline); this.topicPartitions = Collections.unmodifiableCollection(topicPartitions); this.offsetResetStrategy = Objects.requireNonNull(offsetResetStrategy); @@ -49,7 +49,7 @@ public Collection topicPartitions() { return topicPartitions; } - public OffsetResetStrategy offsetResetStrategy() { + public AutoOffsetResetStrategy offsetResetStrategy() { return offsetResetStrategy; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index f39d11a5cb5c7..50f28fb549eb5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; @@ -224,7 +225,7 @@ public class KafkaConsumerTest { private final Collection singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0)); private final Time time = new MockTime(); - private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + private final SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); private KafkaConsumer consumer; @@ -1029,7 +1030,7 @@ private void initMetadata(MockClient mockClient, Map partitionC @ParameterizedTest @EnumSource(value = GroupProtocol.class) public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws InterruptedException { - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1058,7 +1059,7 @@ public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws I @ParameterizedTest @EnumSource(GroupProtocol.class) public void testResetToCommittedOffset(GroupProtocol groupProtocol) { - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1081,7 +1082,7 @@ public void testResetToCommittedOffset(GroupProtocol groupProtocol) { @ParameterizedTest @EnumSource(GroupProtocol.class) public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) { - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST); + SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.LATEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1106,7 +1107,7 @@ public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) { @ParameterizedTest @EnumSource(GroupProtocol.class) public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) { - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST); + SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.LATEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -2283,7 +2284,7 @@ public void testMeasureCommitSyncDurationOnFailure(GroupProtocol groupProtocol) public void testMeasureCommitSyncDuration(GroupProtocol groupProtocol) { Time time = new MockTime(Duration.ofSeconds(1).toMillis()); SubscriptionState subscription = new SubscriptionState(new LogContext(), - OffsetResetStrategy.EARLIEST); + AutoOffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); @@ -2329,7 +2330,7 @@ public void testMeasureCommittedDuration(GroupProtocol groupProtocol) { long offset1 = 10000; Time time = new MockTime(Duration.ofSeconds(1).toMillis()); SubscriptionState subscription = new SubscriptionState(new LogContext(), - OffsetResetStrategy.EARLIEST); + AutoOffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java index 2742b4d8410a3..b69064905eca3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.MetricName; @@ -56,7 +57,7 @@ public class KafkaShareConsumerMetricsTest { new AbstractMap.SimpleEntry<>(topic, topicId)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); private final Time time = new MockTime(); - private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + private final SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); private final String groupId = "mock-group"; @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index c8b644e718931..b69a3e56955ec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; @@ -38,7 +39,7 @@ public class MockConsumerTest { - private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); @Test public void testSimpleMock() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 8b5c2d418df36..69f569be9cd6f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; @@ -132,7 +131,7 @@ private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int reb LogContext logContext = new LogContext(); this.mockTime = new MockTime(); ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, retryBackoffMaxMs, 60 * 60 * 1000L, - false, false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), + false, false, new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST), logContext, new ClusterResourceListeners()); this.mockClient = new MockClient(mockTime, metadata); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 940d9cbaef3fc..93424846b9904 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; @@ -736,7 +735,7 @@ public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) { @Test public void testCommitSyncAllConsumed() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer( mock(FetchBuffer.class), mock(ConsumerInterceptors.class), @@ -760,7 +759,7 @@ public void testCommitSyncAllConsumed() { @Test public void testAutoCommitSyncDisabled() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer( mock(FetchBuffer.class), mock(ConsumerInterceptors.class), @@ -1572,7 +1571,7 @@ public void testGroupIdNotNullAndValid() { @Test public void testEnsurePollEventSentOnConsumerPoll() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer( mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), @@ -1801,7 +1800,7 @@ public void testSeekToBeginning() { CompletableApplicationEvent event = addAndGetLastEnqueuedEvent(); ResetOffsetEvent resetOffsetEvent = assertInstanceOf(ResetOffsetEvent.class, event); assertEquals(topics, new HashSet<>(resetOffsetEvent.topicPartitions())); - assertEquals(OffsetResetStrategy.EARLIEST, resetOffsetEvent.offsetResetStrategy()); + assertEquals(AutoOffsetResetStrategy.EARLIEST, resetOffsetEvent.offsetResetStrategy()); } @Test @@ -1828,7 +1827,7 @@ public void testSeekToEnd() { CompletableApplicationEvent event = addAndGetLastEnqueuedEvent(); ResetOffsetEvent resetOffsetEvent = assertInstanceOf(ResetOffsetEvent.class, event); assertEquals(topics, new HashSet<>(resetOffsetEvent.topicPartitions())); - assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); + assertEquals(AutoOffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java new file mode 100644 index 0000000000000..780319d610f5d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AutoOffsetResetStrategyTest { + + @Test + public void testIsValid() { + assertTrue(AutoOffsetResetStrategy.isValid("earliest")); + assertTrue(AutoOffsetResetStrategy.isValid("latest")); + assertTrue(AutoOffsetResetStrategy.isValid("none")); + assertFalse(AutoOffsetResetStrategy.isValid("invalid")); + assertFalse(AutoOffsetResetStrategy.isValid("LATEST")); + assertFalse(AutoOffsetResetStrategy.isValid("")); + assertFalse(AutoOffsetResetStrategy.isValid(null)); + } + + @Test + public void testFromString() { + assertEquals(AutoOffsetResetStrategy.EARLIEST, AutoOffsetResetStrategy.fromString("earliest")); + assertEquals(AutoOffsetResetStrategy.LATEST, AutoOffsetResetStrategy.fromString("latest")); + assertEquals(AutoOffsetResetStrategy.NONE, AutoOffsetResetStrategy.fromString("none")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("invalid")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("LATEST")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString(null)); + } + + @Test + public void testValidator() { + AutoOffsetResetStrategy.Validator validator = new AutoOffsetResetStrategy.Validator(); + assertDoesNotThrow(() -> validator.ensureValid("test", "earliest")); + assertDoesNotThrow(() -> validator.ensureValid("test", "latest")); + assertDoesNotThrow(() -> validator.ensureValid("test", "none")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "LATEST")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", null)); + } + + @Test + public void testEqualsAndHashCode() { + AutoOffsetResetStrategy strategy1 = AutoOffsetResetStrategy.fromString("earliest"); + AutoOffsetResetStrategy strategy2 = AutoOffsetResetStrategy.fromString("earliest"); + AutoOffsetResetStrategy strategy3 = AutoOffsetResetStrategy.fromString("latest"); + + assertEquals(strategy1, strategy2); + assertNotEquals(strategy1, strategy3); + assertEquals(strategy1.hashCode(), strategy2.hashCode()); + assertNotEquals(strategy1.hashCode(), strategy3.hashCode()); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 0ecd99afbd443..8e98c14b59811 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -119,7 +118,7 @@ public class CommitRequestManagerTest { public void setup() { this.logContext = new LogContext(); this.time = new MockTime(0); - this.subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + this.subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); this.metadata = mock(ConsumerMetadata.class); this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); this.offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java index f7be3a58ffd7d..e12b0121fd493 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.compress.Compression; @@ -216,7 +215,7 @@ public void testCorruptedMessage() { private CompletedFetch newCompletedFetch(long fetchOffset, FetchResponseData.PartitionData partitionData) { LogContext logContext = new LogContext(); - SubscriptionState subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(logContext, AutoOffsetResetStrategy.NONE); FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(); FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), metricsRegistry); FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metrics, Collections.singleton(TP)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 7a39356a8c930..8a6080ffc1f6d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -197,7 +196,7 @@ public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol @BeforeEach public void setup() { LogContext logContext = new LogContext(); - this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST); + this.subscriptions = new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST); this.metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false, subscriptions, logContext, new ClusterResourceListeners()); this.client = new MockClient(time, metadata); @@ -3249,13 +3248,13 @@ public void testNoCoordinatorDiscoveryIfPartitionAwaitingReset() { assertTrue(coordinator.coordinatorUnknown()); subscriptions.assignFromUser(singleton(t1p)); - subscriptions.requestOffsetReset(t1p, OffsetResetStrategy.EARLIEST); + subscriptions.requestOffsetReset(t1p, AutoOffsetResetStrategy.EARLIEST); coordinator.initWithCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE)); assertEquals(Collections.emptySet(), subscriptions.initializingPartitions()); assertFalse(subscriptions.hasAllFetchPositions()); assertEquals(Collections.singleton(t1p), subscriptions.partitionsNeedingReset(time.milliseconds())); - assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(t1p)); + assertEquals(AutoOffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(t1p)); assertTrue(coordinator.coordinatorUnknown()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java index 1f8492f8e5934..949bdc9aa727d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.Node; @@ -65,7 +64,7 @@ public class ConsumerMetadataTest { private final Node node = new Node(1, "localhost", 9092); - private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + private final SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); private final Time time = new MockTime(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index c2b6ad8b1680a..f1db099203a02 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; @@ -1690,7 +1689,7 @@ public void testStaleOutOfRangeError() { @Test public void testFetchedRecordsAfterSeek() { - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); @@ -1711,7 +1710,7 @@ public void testFetchedRecordsAfterSeek() { @Test public void testFetchOffsetOutOfRangeException() { - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); @@ -1733,7 +1732,7 @@ public void testFetchOffsetOutOfRangeException() { public void testFetchPositionAfterException() { // verify the advancement in the next fetch offset equals to the number of fetched records when // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); assignFromUser(Set.of(tp0, tp1)); subscriptions.seek(tp0, 1); @@ -1779,7 +1778,7 @@ private void fetchRecordsInto(List> allFetchedRec @Test public void testCompletedFetchRemoval() { // Ensure the removal of completed fetches that cause an Exception if and only if they contain empty records. - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); assignFromUser(Set.of(tp0, tp1, tp2, tp3)); @@ -1855,7 +1854,7 @@ public void testCompletedFetchRemoval() { @Test public void testSeekBeforeException() { - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(Set.of(tp0)); @@ -2045,7 +2044,7 @@ public void testFetcherLeadMetric() { @Test public void testReadCommittedLagMetric() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); assignFromUser(singleton(tp0)); @@ -2262,7 +2261,7 @@ public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() { @Test public void testFetcherMetricsTemplates() { Map clientTags = Collections.singletonMap("client-id", "clientA"); - buildFetcher(new MetricConfig().tags(clientTags), OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(new MetricConfig().tags(clientTags), AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); // Fetch from topic to generate topic metrics @@ -2308,7 +2307,7 @@ private Map>> fetchRecords( @Test public void testSkippingAbortedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2343,7 +2342,7 @@ public void testSkippingAbortedTransactions() { @Test public void testReturnCommittedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2379,7 +2378,7 @@ public void testReturnCommittedTransactions() { @Test public void testReadCommittedWithCommittedAndAbortedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -2455,7 +2454,7 @@ public void testReadCommittedWithCommittedAndAbortedTransactions() { @Test public void testMultipleAbortMarkers() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2504,7 +2503,7 @@ public void testMultipleAbortMarkers() { @Test public void testReadCommittedAbortMarkerWithNoData() { - buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -2621,7 +2620,7 @@ public void testUpdatePositionOnEmptyBatch() { @Test public void testReadCommittedWithCompactedTopic() { - buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -2684,7 +2683,7 @@ public void testReadCommittedWithCompactedTopic() { @Test public void testReturnAbortedTransactionsInUncommittedMode() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2718,7 +2717,7 @@ public void testReturnAbortedTransactionsInUncommittedMode() { @Test public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); long currentOffset = 0; @@ -2832,7 +2831,7 @@ public void testConsumingViaIncrementalFetchRequests() { @Test public void testEmptyControlBatch() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 1; @@ -2960,7 +2959,7 @@ public void testSubscriptionPositionUpdatedWithEpoch() { @Test public void testPreferredReadReplica() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3003,7 +3002,7 @@ public void testPreferredReadReplica() { @Test public void testFetchDisconnectedShouldClearPreferredReadReplica() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3036,7 +3035,7 @@ public void testFetchDisconnectedShouldClearPreferredReadReplica() { @Test public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3071,7 +3070,7 @@ public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned( @Test public void testFetchErrorShouldClearPreferredReadReplica() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3106,7 +3105,7 @@ public void testFetchErrorShouldClearPreferredReadReplica() { @Test public void testPreferredReadReplicaOffsetError() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3204,7 +3203,7 @@ public void testCorruptMessageError() { public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInformation(Errors error) { // The test runs with 2 partitions where 1 partition is fetched without errors, and // 2nd partition faces errors due to leadership changes. - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Duration.ofMinutes(5).toMillis()); @@ -3297,7 +3296,7 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInfo public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInformation(Errors error) { // The test runs with 2 partitions where 1 partition is fetched without errors, and // 2nd partition faces errors due to leadership changes. - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Duration.ofMinutes(5).toMillis()); @@ -3580,7 +3579,7 @@ private Fetch collectFetch() { } private void buildFetcher(int maxPollRecords) { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), maxPollRecords, IsolationLevel.READ_UNCOMMITTED); } @@ -3590,11 +3589,11 @@ private void buildFetcher() { private void buildFetcher(Deserializer keyDeserializer, Deserializer valueDeserializer) { - buildFetcher(OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, + buildFetcher(AutoOffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); } - private void buildFetcher(OffsetResetStrategy offsetResetStrategy, + private void buildFetcher(AutoOffsetResetStrategy offsetResetStrategy, Deserializer keyDeserializer, Deserializer valueDeserializer, int maxPollRecords, @@ -3604,7 +3603,7 @@ private void buildFetcher(OffsetResetStrategy offsetResetStrategy, } private void buildFetcher(MetricConfig metricConfig, - OffsetResetStrategy offsetResetStrategy, + AutoOffsetResetStrategy offsetResetStrategy, Deserializer keyDeserializer, Deserializer valueDeserializer, int maxPollRecords, @@ -3613,7 +3612,7 @@ private void buildFetcher(MetricConfig metricConfig, } private void buildFetcher(MetricConfig metricConfig, - OffsetResetStrategy offsetResetStrategy, + AutoOffsetResetStrategy offsetResetStrategy, Deserializer keyDeserializer, Deserializer valueDeserializer, int maxPollRecords, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index c554f1c8e7a20..707d49595c980 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; @@ -1676,7 +1675,7 @@ public void testStaleOutOfRangeError() { @Test public void testFetchedRecordsAfterSeek() { - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); @@ -1697,7 +1696,7 @@ public void testFetchedRecordsAfterSeek() { @Test public void testFetchOffsetOutOfRangeException() { - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); @@ -1719,7 +1718,7 @@ public void testFetchOffsetOutOfRangeException() { public void testFetchPositionAfterException() { // verify the advancement in the next fetch offset equals to the number of fetched records when // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); assignFromUser(Set.of(tp0, tp1)); subscriptions.seek(tp0, 1); @@ -1765,7 +1764,7 @@ private void fetchRecordsInto(List> allFetchedRec @Test public void testCompletedFetchRemoval() { // Ensure the removal of completed fetches that cause an Exception if and only if they contain empty records. - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); assignFromUser(Set.of(tp0, tp1, tp2, tp3)); @@ -1841,7 +1840,7 @@ public void testCompletedFetchRemoval() { @Test public void testSeekBeforeException() { - buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(Set.of(tp0)); @@ -2031,7 +2030,7 @@ public void testFetcherLeadMetric() { @Test public void testReadCommittedLagMetric() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); assignFromUser(singleton(tp0)); @@ -2248,7 +2247,7 @@ public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() { @Test public void testFetcherMetricsTemplates() { Map clientTags = Collections.singletonMap("client-id", "clientA"); - buildFetcher(new MetricConfig().tags(clientTags), OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(new MetricConfig().tags(clientTags), AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); // Fetch from topic to generate topic metrics @@ -2294,7 +2293,7 @@ private Map>> fetchRecords( @Test public void testSkippingAbortedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2329,7 +2328,7 @@ public void testSkippingAbortedTransactions() { @Test public void testReturnCommittedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2365,7 +2364,7 @@ public void testReturnCommittedTransactions() { @Test public void testReadCommittedWithCommittedAndAbortedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -2441,7 +2440,7 @@ public void testReadCommittedWithCommittedAndAbortedTransactions() { @Test public void testMultipleAbortMarkers() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2490,7 +2489,7 @@ public void testMultipleAbortMarkers() { @Test public void testReadCommittedAbortMarkerWithNoData() { - buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -2607,7 +2606,7 @@ public void testUpdatePositionOnEmptyBatch() { @Test public void testReadCommittedWithCompactedTopic() { - buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -2670,7 +2669,7 @@ public void testReadCommittedWithCompactedTopic() { @Test public void testReturnAbortedTransactionsInUncommittedMode() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; @@ -2704,7 +2703,7 @@ public void testReturnAbortedTransactionsInUncommittedMode() { @Test public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); long currentOffset = 0; @@ -2824,7 +2823,7 @@ public void testFetcherConcurrency() throws Exception { topicPartitions.add(new TopicPartition(topicName, i)); LogContext logContext = new LogContext(); - buildDependencies(new MetricConfig(), Long.MAX_VALUE, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), logContext); + buildDependencies(new MetricConfig(), Long.MAX_VALUE, new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST), logContext); IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED; @@ -3031,7 +3030,7 @@ public void testFetcherSessionEpochUpdate() throws Exception { @Test public void testEmptyControlBatch() { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 1; @@ -3178,7 +3177,7 @@ public void testTruncationDetected() { builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes()); MemoryRecords records = builder.build(); - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); @@ -3237,7 +3236,7 @@ public void testTruncationDetected() { @Test public void testPreferredReadReplica() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3280,7 +3279,7 @@ public void testPreferredReadReplica() { @Test public void testFetchDisconnectedShouldClearPreferredReadReplica() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3313,7 +3312,7 @@ public void testFetchDisconnectedShouldClearPreferredReadReplica() { @Test public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3348,7 +3347,7 @@ public void testFetchDisconnectedShouldNotClearPreferredReadReplicaIfUnassigned( @Test public void testFetchErrorShouldClearPreferredReadReplica() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3383,7 +3382,7 @@ public void testFetchErrorShouldClearPreferredReadReplica() { @Test public void testPreferredReadReplicaOffsetError() { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis()); subscriptions.assignFromUser(singleton(tp0)); @@ -3480,7 +3479,7 @@ public void testCorruptMessageError() { public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInformation(Errors error) { // The test runs with 2 partitions where 1 partition is fetched without errors, and // 2nd partition faces errors due to leadership changes. - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Duration.ofMinutes(5).toMillis()); @@ -3573,7 +3572,7 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorButNoNewLeaderInfo public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInformation(Errors error) { // The test runs with 2 partitions where 1 partition is fetched without errors, and // 2nd partition faces errors due to leadership changes. - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Duration.ofMinutes(5).toMillis()); @@ -3825,7 +3824,7 @@ private Fetch collectFetch() { } private void buildFetcher(int maxPollRecords) { - buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), + buildFetcher(AutoOffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), maxPollRecords, IsolationLevel.READ_UNCOMMITTED); } @@ -3835,11 +3834,11 @@ private void buildFetcher() { private void buildFetcher(Deserializer keyDeserializer, Deserializer valueDeserializer) { - buildFetcher(OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, + buildFetcher(AutoOffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); } - private void buildFetcher(OffsetResetStrategy offsetResetStrategy, + private void buildFetcher(AutoOffsetResetStrategy offsetResetStrategy, Deserializer keyDeserializer, Deserializer valueDeserializer, int maxPollRecords, @@ -3849,7 +3848,7 @@ private void buildFetcher(OffsetResetStrategy offsetResetStrategy, } private void buildFetcher(MetricConfig metricConfig, - OffsetResetStrategy offsetResetStrategy, + AutoOffsetResetStrategy offsetResetStrategy, Deserializer keyDeserializer, Deserializer valueDeserializer, int maxPollRecords, @@ -3858,7 +3857,7 @@ private void buildFetcher(MetricConfig metricConfig, } private void buildFetcher(MetricConfig metricConfig, - OffsetResetStrategy offsetResetStrategy, + AutoOffsetResetStrategy offsetResetStrategy, Deserializer keyDeserializer, Deserializer valueDeserializer, int maxPollRecords, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java index bcbd041165637..d5ff67359c465 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.LogTruncationException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; @@ -174,7 +173,7 @@ public void testUpdateFetchPositionResetToDefaultOffset() { public void testUpdateFetchPositionResetToLatestOffset() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); client.updateMetadata(initialUpdateResponse); @@ -194,7 +193,7 @@ public void testUpdateFetchPositionResetToLatestOffset() { public void testFetchOffsetErrors() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // Fail with OFFSET_NOT_AVAILABLE client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, @@ -241,7 +240,7 @@ private void testListOffsetsSendsIsolationLevel(IsolationLevel isolationLevel) { buildFetcher(isolationLevel); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); client.prepareResponse(body -> { ListOffsetsRequest request = (ListOffsetsRequest) body; @@ -260,7 +259,7 @@ private void testListOffsetsSendsIsolationLevel(IsolationLevel isolationLevel) { public void testresetPositionsSkipsBlackedOutConnections() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST); // Check that we skip sending the ListOffset request when the node is blacked out client.updateMetadata(initialUpdateResponse); @@ -270,7 +269,7 @@ public void testresetPositionsSkipsBlackedOutConnections() { assertEquals(0, consumerClient.pendingRequestCount()); consumerClient.pollNoWakeup(); assertTrue(subscriptions.isOffsetResetNeeded(tp0)); - assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(tp0)); + assertEquals(AutoOffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(tp0)); time.sleep(500); client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.EARLIEST_TIMESTAMP), @@ -287,7 +286,7 @@ public void testresetPositionsSkipsBlackedOutConnections() { public void testUpdateFetchPositionResetToEarliestOffset() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.EARLIEST_TIMESTAMP, validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 5L)); @@ -303,7 +302,7 @@ public void testUpdateFetchPositionResetToEarliestOffset() { public void testresetPositionsMetadataRefresh() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // First fetch fails with stale metadata client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, @@ -340,7 +339,7 @@ public void testListOffsetNoUpdateMissingEpoch() { client.updateMetadata(metadataWithNoLeaderEpochs); // Return a ListOffsets response with leaderEpoch=1, we should ignore it - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP), listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 1)); offsetFetcher.resetPositionsIfNeeded(); @@ -363,7 +362,7 @@ public void testListOffsetUpdateEpoch() { client.updateMetadata(metadataWithLeaderEpochs); // Reset offsets to trigger ListOffsets call - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // Now we see a ListOffsets with leaderEpoch=2 epoch, we trigger a metadata update client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, 1), @@ -380,7 +379,7 @@ public void testListOffsetUpdateEpoch() { public void testUpdateFetchPositionDisconnect() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, @@ -416,7 +415,7 @@ public void testUpdateFetchPositionDisconnect() { public void testAssignmentChangeWithInFlightReset() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // Send the ListOffsets request to reset the position offsetFetcher.resetPositionsIfNeeded(); @@ -440,7 +439,7 @@ public void testAssignmentChangeWithInFlightReset() { public void testSeekWithInFlightReset() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // Send the ListOffsets request to reset the position offsetFetcher.resetPositionsIfNeeded(); @@ -462,7 +461,7 @@ public void testSeekWithInFlightReset() { private boolean listOffsetMatchesExpectedReset( TopicPartition tp, - OffsetResetStrategy strategy, + AutoOffsetResetStrategy strategy, AbstractRequest request ) { assertInstanceOf(ListOffsetsRequest.class, request); @@ -476,9 +475,9 @@ private boolean listOffsetMatchesExpectedReset( .map(ListOffsetsPartition::partitionIndex).collect(Collectors.toSet())); ListOffsetsPartition listPartition = listTopic.partitions().get(0); - if (strategy == OffsetResetStrategy.EARLIEST) { + if (strategy == AutoOffsetResetStrategy.EARLIEST) { assertEquals(ListOffsetsRequest.EARLIEST_TIMESTAMP, listPartition.timestamp()); - } else if (strategy == OffsetResetStrategy.LATEST) { + } else if (strategy == AutoOffsetResetStrategy.LATEST) { assertEquals(ListOffsetsRequest.LATEST_TIMESTAMP, listPartition.timestamp()); } return true; @@ -489,13 +488,13 @@ public void testEarlierOffsetResetArrivesLate() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST); offsetFetcher.resetPositionsIfNeeded(); client.prepareResponse(req -> { - if (listOffsetMatchesExpectedReset(tp0, OffsetResetStrategy.EARLIEST, req)) { + if (listOffsetMatchesExpectedReset(tp0, AutoOffsetResetStrategy.EARLIEST, req)) { // Before the response is handled, we get a request to reset to the latest offset - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); return true; } else { return false; @@ -505,11 +504,11 @@ public void testEarlierOffsetResetArrivesLate() { // The list offset result should be ignored assertTrue(subscriptions.isOffsetResetNeeded(tp0)); - assertEquals(OffsetResetStrategy.LATEST, subscriptions.resetStrategy(tp0)); + assertEquals(AutoOffsetResetStrategy.LATEST, subscriptions.resetStrategy(tp0)); offsetFetcher.resetPositionsIfNeeded(); client.prepareResponse( - req -> listOffsetMatchesExpectedReset(tp0, OffsetResetStrategy.LATEST, req), + req -> listOffsetMatchesExpectedReset(tp0, AutoOffsetResetStrategy.LATEST, req), listOffsetResponse(Errors.NONE, 1L, 10L) ); consumerClient.pollNoWakeup(); @@ -522,7 +521,7 @@ public void testEarlierOffsetResetArrivesLate() { public void testChangeResetWithInFlightReset() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // Send the ListOffsets request to reset the position offsetFetcher.resetPositionsIfNeeded(); @@ -531,7 +530,7 @@ public void testChangeResetWithInFlightReset() { assertTrue(client.hasInFlightRequests()); // Now we get a seek from the user - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST); // The response returns and is discarded client.respond(listOffsetResponse(Errors.NONE, 1L, 5L)); @@ -540,14 +539,14 @@ public void testChangeResetWithInFlightReset() { assertFalse(client.hasPendingResponses()); assertFalse(client.hasInFlightRequests()); assertTrue(subscriptions.isOffsetResetNeeded(tp0)); - assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(tp0)); + assertEquals(AutoOffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(tp0)); } @Test public void testIdempotentResetWithInFlightReset() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // Send the ListOffsets request to reset the position offsetFetcher.resetPositionsIfNeeded(); @@ -556,7 +555,7 @@ public void testIdempotentResetWithInFlightReset() { assertTrue(client.hasInFlightRequests()); // Now we get a seek from the user - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); client.respond(listOffsetResponse(Errors.NONE, 1L, 5L)); consumerClient.pollNoWakeup(); @@ -570,7 +569,7 @@ public void testIdempotentResetWithInFlightReset() { public void testResetOffsetsAuthorizationFailure() { buildFetcher(); assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, @@ -638,7 +637,7 @@ public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() { buildFetcher(); assignFromUser(singleton(tp0)); subscriptions.pause(tp0); // paused partition does not have a valid position - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, validLeaderEpoch), listOffsetResponse(Errors.NONE, 1L, 10L)); @@ -717,7 +716,7 @@ public void testGetOffsetsFencedLeaderEpoch() { subscriptions.assignFromUser(singleton(tp0)); client.updateMetadata(initialUpdateResponse); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); client.prepareResponse(listOffsetResponse(Errors.FENCED_LEADER_EPOCH, 1L, 5L)); offsetFetcher.resetPositionsIfNeeded(); @@ -846,7 +845,7 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() { public void testGetOffsetsUnknownLeaderEpoch() { buildFetcher(); subscriptions.assignFromUser(singleton(tp0)); - subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST); client.prepareResponse(listOffsetResponse(Errors.UNKNOWN_LEADER_EPOCH, 1L, 5L)); offsetFetcher.resetPositionsIfNeeded(); @@ -1236,7 +1235,7 @@ public void testOffsetValidationSkippedForOldBroker() { IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED; int maxPollRecords = Integer.MAX_VALUE; long metadataExpireMs = Long.MAX_VALUE; - OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.EARLIEST; + AutoOffsetResetStrategy offsetResetStrategy = AutoOffsetResetStrategy.EARLIEST; int minBytes = 1; int maxBytes = Integer.MAX_VALUE; int maxWaitMs = 0; @@ -1355,36 +1354,36 @@ public void testOffsetValidationSkippedForOldResponse() { @Test public void testOffsetValidationresetPositionForUndefinedEpochWithDefinedResetPolicy() { testOffsetValidationWithGivenEpochOffset( - UNDEFINED_EPOCH, 0L, OffsetResetStrategy.EARLIEST); + UNDEFINED_EPOCH, 0L, AutoOffsetResetStrategy.EARLIEST); } @Test public void testOffsetValidationresetPositionForUndefinedOffsetWithDefinedResetPolicy() { testOffsetValidationWithGivenEpochOffset( - 2, UNDEFINED_EPOCH_OFFSET, OffsetResetStrategy.EARLIEST); + 2, UNDEFINED_EPOCH_OFFSET, AutoOffsetResetStrategy.EARLIEST); } @Test public void testOffsetValidationresetPositionForUndefinedEpochWithUndefinedResetPolicy() { testOffsetValidationWithGivenEpochOffset( - UNDEFINED_EPOCH, 0L, OffsetResetStrategy.NONE); + UNDEFINED_EPOCH, 0L, AutoOffsetResetStrategy.NONE); } @Test public void testOffsetValidationresetPositionForUndefinedOffsetWithUndefinedResetPolicy() { testOffsetValidationWithGivenEpochOffset( - 2, UNDEFINED_EPOCH_OFFSET, OffsetResetStrategy.NONE); + 2, UNDEFINED_EPOCH_OFFSET, AutoOffsetResetStrategy.NONE); } @Test public void testOffsetValidationTriggerLogTruncationForBadOffsetWithUndefinedResetPolicy() { testOffsetValidationWithGivenEpochOffset( - 1, 1L, OffsetResetStrategy.NONE); + 1, 1L, AutoOffsetResetStrategy.NONE); } private void testOffsetValidationWithGivenEpochOffset(int leaderEpoch, long endOffset, - OffsetResetStrategy offsetResetStrategy) { + AutoOffsetResetStrategy offsetResetStrategy) { buildFetcher(offsetResetStrategy); assignFromUser(singleton(tp0)); @@ -1415,7 +1414,7 @@ private void testOffsetValidationWithGivenEpochOffset(int leaderEpoch, prepareOffsetsForLeaderEpochResponse(tp0, leaderEpoch, endOffset)); consumerClient.poll(time.timer(Duration.ZERO)); - if (offsetResetStrategy == OffsetResetStrategy.NONE) { + if (offsetResetStrategy == AutoOffsetResetStrategy.NONE) { LogTruncationException thrown = assertThrows(LogTruncationException.class, () -> offsetFetcher.validatePositionsIfNeeded()); assertEquals(singletonMap(tp0, initialOffset), thrown.offsetOutOfRangePartitions()); @@ -1690,16 +1689,16 @@ private void buildFetcher() { buildFetcher(IsolationLevel.READ_UNCOMMITTED); } - private void buildFetcher(OffsetResetStrategy offsetResetStrategy) { + private void buildFetcher(AutoOffsetResetStrategy offsetResetStrategy) { buildFetcher(new MetricConfig(), offsetResetStrategy, IsolationLevel.READ_UNCOMMITTED); } private void buildFetcher(IsolationLevel isolationLevel) { - buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, isolationLevel); + buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, isolationLevel); } private void buildFetcher(MetricConfig metricConfig, - OffsetResetStrategy offsetResetStrategy, + AutoOffsetResetStrategy offsetResetStrategy, IsolationLevel isolationLevel) { long metadataExpireMs = Long.MAX_VALUE; LogContext logContext = new LogContext(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java index 721f1560f562c..a48b32b43efb6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -147,11 +146,11 @@ public void testRetriableError() { } private OffsetsForLeaderEpochClient newOffsetClient() { - buildDependencies(OffsetResetStrategy.EARLIEST); + buildDependencies(AutoOffsetResetStrategy.EARLIEST); return new OffsetsForLeaderEpochClient(consumerClient, new LogContext()); } - private void buildDependencies(OffsetResetStrategy offsetResetStrategy) { + private void buildDependencies(AutoOffsetResetStrategy offsetResetStrategy) { LogContext logContext = new LogContext(); Time time = new MockTime(1); SubscriptionState subscriptions = new SubscriptionState(logContext, offsetResetStrategy); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index 996ae05feb938..dbfcb6cd46a8c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.IsolationLevel; @@ -514,7 +513,7 @@ public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() { public void testResetPositionsMissingLeader() { mockFailedRequest_MissingLeader(); when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); - when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST); + when(subscriptionState.resetStrategy(any())).thenReturn(AutoOffsetResetStrategy.EARLIEST); requestManager.resetPositionsIfNeeded(); verify(metadata).requestUpdate(true); assertEquals(0, requestManager.requestsToSend()); @@ -537,7 +536,7 @@ public void testResetPositionsSuccess_LeaderEpochInResponse() { @Test public void testResetOffsetsAuthorizationFailure() { when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); - when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST); + when(subscriptionState.resetStrategy(any())).thenReturn(AutoOffsetResetStrategy.EARLIEST); mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); CompletableFuture resetResult = requestManager.resetPositionsIfNeeded(); @@ -844,7 +843,7 @@ private void mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState. private void testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch leaderAndEpoch) { TopicPartition tp = TEST_PARTITION_1; Node leader = LEADER_1; - OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST; + AutoOffsetResetStrategy strategy = AutoOffsetResetStrategy.EARLIEST; long offset = 5L; when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp)); when(subscriptionState.resetStrategy(any())).thenReturn(strategy); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 135574d4e1d5a..79a47a536b6d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; @@ -1510,7 +1509,7 @@ private void buildRequestManager(MetricConfig metricConfig, Deserializer keyDeserializer, Deserializer valueDeserializer) { LogContext logContext = new LogContext(); - SubscriptionState subscriptionState = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST); + SubscriptionState subscriptionState = new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST); buildRequestManager(metricConfig, keyDeserializer, valueDeserializer, subscriptionState, logContext); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index dc06875f7565d..7fd0bc3fe60e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; @@ -169,7 +168,7 @@ private ShareConsumerImpl newConsumer( @Test public void testSuccessfulStartupShutdown() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); @@ -198,7 +197,7 @@ public void testFailConstructor() { @Test public void testWakeupBeforeCallingPoll() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); final String topicName = "foo"; @@ -216,7 +215,7 @@ public void testWakeupBeforeCallingPoll() { @Test public void testWakeupAfterEmptyFetch() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); final String topicName = "foo"; @@ -235,7 +234,7 @@ public void testWakeupAfterEmptyFetch() { @Test public void testWakeupAfterNonEmptyFetch() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); final String topicName = "foo"; @@ -262,7 +261,7 @@ public void testWakeupAfterNonEmptyFetch() { @Test public void testFailOnClosedConsumer() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); @@ -274,7 +273,7 @@ public void testFailOnClosedConsumer() { @Test public void testVerifyApplicationEventOnShutdown() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); @@ -336,7 +335,7 @@ public void testCompleteQuietly() { @Test public void testSubscribeGeneratesEvent() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); String topic = "topic1"; @@ -349,7 +348,7 @@ public void testSubscribeGeneratesEvent() { @Test public void testUnsubscribeGeneratesUnsubscribeEvent() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); completeShareUnsubscribeApplicationEventSuccessfully(subscriptions); @@ -361,7 +360,7 @@ public void testUnsubscribeGeneratesUnsubscribeEvent() { @Test public void testSubscribeToEmptyListActsAsUnsubscribe() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); completeShareUnsubscribeApplicationEventSuccessfully(subscriptions); @@ -461,7 +460,7 @@ private void testInvalidGroupId(final String groupId) { @Test public void testEnsurePollEventSentOnConsumerPoll() { - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); consumer = newConsumer(subscriptions); final TopicPartition tp = new TopicPartition("topic", 0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 3bde4367cac84..6854cbdf6ea49 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; @@ -53,7 +52,7 @@ public class SubscriptionStateTest { - private SubscriptionState state = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + private SubscriptionState state = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); private final String topic = "test"; private final String topic1 = "test1"; private final TopicPartition tp0 = new TopicPartition(topic, 0); @@ -587,7 +586,7 @@ public void testOffsetResetWhileAwaitingValidation() { new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10)))); assertTrue(state.awaitingValidation(tp0)); - state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + state.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST); assertFalse(state.awaitingValidation(tp0)); assertTrue(state.isOffsetResetNeeded(tp0)); } @@ -734,7 +733,7 @@ public void testTruncationDetectionWithResetPolicy() { @Test public void testTruncationDetectionWithoutResetPolicy() { Node broker1 = new Node(1, "localhost", 9092); - state = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + state = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); state.assignFromUser(Collections.singleton(tp0)); int currentEpoch = 10; @@ -764,7 +763,7 @@ public void testTruncationDetectionWithoutResetPolicy() { @Test public void testTruncationDetectionUnknownDivergentOffsetWithResetPolicy() { Node broker1 = new Node(1, "localhost", 9092); - state = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + state = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); state.assignFromUser(Collections.singleton(tp0)); int currentEpoch = 10; @@ -783,13 +782,13 @@ public void testTruncationDetectionUnknownDivergentOffsetWithResetPolicy() { assertEquals(Optional.empty(), truncationOpt); assertFalse(state.awaitingValidation(tp0)); assertTrue(state.isOffsetResetNeeded(tp0)); - assertEquals(OffsetResetStrategy.EARLIEST, state.resetStrategy(tp0)); + assertEquals(AutoOffsetResetStrategy.EARLIEST, state.resetStrategy(tp0)); } @Test public void testTruncationDetectionUnknownDivergentOffsetWithoutResetPolicy() { Node broker1 = new Node(1, "localhost", 9092); - state = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + state = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); state.assignFromUser(Collections.singleton(tp0)); int currentEpoch = 10; @@ -841,7 +840,7 @@ public void resetOffsetNoValidation() { state.assignFromUser(Collections.singleton(tp0)); // Reset offsets - state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + state.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST); // Attempt to validate with older API version, should do nothing ApiVersions oldApis = new ApiVersions(); @@ -866,7 +865,7 @@ public void resetOffsetNoValidation() { assertFalse(state.isOffsetResetNeeded(tp0)); // Reset again, and complete it with a seek that would normally require validation - state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + state.requestOffsetReset(tp0, AutoOffsetResetStrategy.EARLIEST); state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(10), new Metadata.LeaderAndEpoch( Optional.of(broker1), Optional.of(2)))); // We are now in AWAIT_VALIDATION diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java index 5f34836eaba4b..ce2aa86e3e26e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -240,7 +239,7 @@ private void buildFetcher() { long retryBackoffMs = 100; long retryBackoffMaxMs = 1000; LogContext logContext = new LogContext(); - SubscriptionState subscriptionState = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST); + SubscriptionState subscriptionState = new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST); buildDependencies(metricConfig, metadataExpireMs, subscriptionState, logContext); topicMetadataFetcher = new TopicMetadataFetcher(logContext, consumerClient, retryBackoffMs, retryBackoffMaxMs); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index ea09fc2ae8be4..dfd67ce4710ac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager; @@ -192,7 +192,7 @@ public void testAssignmentChangeEventWithException() { @Test public void testResetOffsetEvent() { Collection tp = Collections.singleton(new TopicPartition("topic", 0)); - OffsetResetStrategy strategy = OffsetResetStrategy.LATEST; + AutoOffsetResetStrategy strategy = AutoOffsetResetStrategy.LATEST; ResetOffsetEvent event = new ResetOffsetEvent(tp, strategy, 12345); setupProcessor(false); @@ -288,7 +288,7 @@ tp2, new OffsetAndMetadata(20L, Optional.of(3), "") @Test public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState() { - subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); Optional listener = Optional.of(new MockRebalanceListener()); TopicSubscriptionChangeEvent event = new TopicSubscriptionChangeEvent( Set.of("topic1", "topic2"), listener, 12345); @@ -335,7 +335,7 @@ public void testTopicPatternSubscriptionChangeEvent() { @Test public void testTopicPatternSubscriptionChangeEventWithIllegalSubscriptionState() { - subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); Optional listener = Optional.of(new MockRebalanceListener()); TopicPatternSubscriptionChangeEvent event = new TopicPatternSubscriptionChangeEvent( Pattern.compile("topic.*"), listener, 12345); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index edad9a250f05c..b3e0189f987f6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -1752,7 +1752,7 @@ public void testOriginalTopicWithTopicMutatingTransformations() { @Test public void testPartitionCountInCaseOfPartitionRevocation() { - MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); // Setting up Worker Sink Task to check metrics workerTask = new WorkerSinkTask( taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java index 4f849f7b3ba79..f78ab54950f4a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; @@ -437,7 +437,7 @@ protected Consumer createConsumer() { } private MockConsumer createMockConsumer(String topic) { - MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.LATEST.name()); Node noNode = Node.noNode(); Node[] nodes = new Node[]{noNode}; consumer.updatePartitions(topic, Collections.singletonList(new PartitionInfo(topic, 0, noNode, nodes, nodes))); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 9089e6f59c8bf..aabf894e1ea90 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -20,7 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -152,7 +152,7 @@ protected MockConsumer createConsumer() { return consumer; } }; - consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map beginningOffsets = new HashMap<>(); beginningOffsets.put(TP0, 0L); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java index 9652ce464c6d3..dc67a662b7ec7 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/consumer/SubscriptionStateBenchmark.java @@ -18,7 +18,7 @@ package org.apache.kafka.jmh.consumer; import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -66,7 +66,7 @@ public void setup() { assignment.add(new TopicPartition(String.format("topic-%04d", topicId), partitionId)) ) ); - subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); subscriptionState.assignFromUser(assignment); SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( 0L, diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java index 66176c68477a1..4bde23dc5c98c 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -84,7 +84,7 @@ public class ConsumerTaskTest { public void beforeEach() { final Map offsets = remoteLogPartitions.stream() .collect(Collectors.toMap(Function.identity(), e -> 0L)); - consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); consumer.updateBeginningOffsets(offsets); consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, Time.SYSTEM); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index f01551bd7e92a..2c5e798b62dfc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Deserializer; @@ -63,9 +63,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST; -import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST; -import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; public class InternalTopologyBuilder { @@ -1345,15 +1342,15 @@ public boolean hasOffsetResetOverrides() { && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty()); } - public OffsetResetStrategy offsetResetStrategy(final String topic) { + public AutoOffsetResetStrategy offsetResetStrategy(final String topic) { if (maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) || earliestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { - return EARLIEST; + return AutoOffsetResetStrategy.EARLIEST; } else if (maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) || latestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { - return LATEST; + return AutoOffsetResetStrategy.LATEST; } else if (containsTopic(topic)) { - return NONE; + return AutoOffsetResetStrategy.NONE; } else { throw new IllegalStateException(String.format( "Unable to lookup offset reset strategy for the following topic as it does not exist in the topology%s: %s", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 428e98e6b1ec4..ceb2c403c3ebb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; @@ -1294,29 +1294,26 @@ private void resetOffsets(final Set partitions, final Exception final Set notReset = new HashSet<>(); for (final TopicPartition partition : partitions) { - final OffsetResetStrategy offsetResetStrategy = topologyMetadata.offsetResetStrategy(partition.topic()); + final AutoOffsetResetStrategy offsetResetStrategy = topologyMetadata.offsetResetStrategy(partition.topic()); // This may be null if the task we are currently processing was apart of a named topology that was just removed. // TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata view of named topologies in sync until final thread has acked if (offsetResetStrategy != null) { - switch (offsetResetStrategy) { - case EARLIEST: - addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics); - break; - case LATEST: - addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics); - break; - case NONE: - if ("earliest".equals(originalReset)) { - addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics); - } else if ("latest".equals(originalReset)) { - addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics); - } else { - notReset.add(partition); - } - break; - default: - throw new IllegalStateException("Unable to locate topic " + partition.topic() + " in the topology"); + if (offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST) { + addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics); + } else if (offsetResetStrategy == AutoOffsetResetStrategy.LATEST) { + addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics); + } else if (offsetResetStrategy == AutoOffsetResetStrategy.NONE) { + final AutoOffsetResetStrategy autoOffsetResetStrategy = AutoOffsetResetStrategy.fromString(originalReset); + if (AutoOffsetResetStrategy.EARLIEST == autoOffsetResetStrategy) { + addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics); + } else if (AutoOffsetResetStrategy.LATEST == autoOffsetResetStrategy) { + addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics); + } else { + notReset.add(partition); + } + } else { + throw new IllegalStateException("Unable to locate topic " + partition.topic() + " in the topology"); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index b1f6af32c9124..88e4ede59ada6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -427,7 +427,7 @@ public boolean hasOffsetResetOverrides() { return hasNamedTopologies() || evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides); } - public OffsetResetStrategy offsetResetStrategy(final String topic) { + public AutoOffsetResetStrategy offsetResetStrategy(final String topic) { for (final InternalTopologyBuilder builder : builders.values()) { if (builder.containsTopic(topic)) { return builder.offsetResetStrategy(topic); diff --git a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java index 5ca74585e25fb..faf30334e73f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.internals.metrics; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Measurable; @@ -62,7 +62,7 @@ public void setUp() { final Map noThreadIdTagMap = new HashMap<>(); noThreadIdTagMap.put("client-id", "foo"); - mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE); + mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()); streamsThreadMetricsDelegatingReporter = new StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId); final MetricName metricNameOne = new MetricName("metric-one", "test-group-one", "foo bar baz", threadIdTagMap); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 755944227b7e1..3df6840ff7120 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -294,7 +294,7 @@ public void shouldAddTopicToEarliestAutoOffsetResetList() { builder.stream(Collections.singleton(topicName), consumed); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(OffsetResetStrategy.EARLIEST)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(AutoOffsetResetStrategy.EARLIEST)); } @Test @@ -304,7 +304,7 @@ public void shouldAddTopicToLatestAutoOffsetResetList() { final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)); builder.stream(Collections.singleton(topicName), consumed); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(OffsetResetStrategy.LATEST)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(AutoOffsetResetStrategy.LATEST)); } @Test @@ -312,7 +312,7 @@ public void shouldAddTableToEarliestAutoOffsetResetList() { final String topicName = "topic-1"; builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), materialized); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(OffsetResetStrategy.EARLIEST)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(AutoOffsetResetStrategy.EARLIEST)); } @Test @@ -320,7 +320,7 @@ public void shouldAddTableToLatestAutoOffsetResetList() { final String topicName = "topic-1"; builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), materialized); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(OffsetResetStrategy.LATEST)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(AutoOffsetResetStrategy.LATEST)); } @Test @@ -330,7 +330,7 @@ public void shouldNotAddTableToOffsetResetLists() { builder.table(topicName, consumed, materialized); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(OffsetResetStrategy.NONE)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicName), equalTo(AutoOffsetResetStrategy.NONE)); } @Test @@ -341,7 +341,7 @@ public void shouldNotAddRegexTopicsToOffsetResetLists() { builder.stream(topicPattern, consumed); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topic), equalTo(OffsetResetStrategy.NONE)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topic), equalTo(AutoOffsetResetStrategy.NONE)); } @Test @@ -352,7 +352,7 @@ public void shouldAddRegexTopicToEarliestAutoOffsetResetList() { builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST))); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo), equalTo(OffsetResetStrategy.EARLIEST)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo), equalTo(AutoOffsetResetStrategy.EARLIEST)); } @Test @@ -363,7 +363,7 @@ public void shouldAddRegexTopicToLatestAutoOffsetResetList() { builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST))); builder.buildAndOptimizeTopology(); - assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo), equalTo(OffsetResetStrategy.LATEST)); + assertThat(builder.internalTopologyBuilder.offsetResetStrategy(topicTwo), equalTo(AutoOffsetResetStrategy.LATEST)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index ff461ccc90b4a..e28ef673f07b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -161,7 +161,7 @@ public void before() { } }); stateDirectory = new StateDirectory(streamsConfig, time, true, false); - consumer = new MockConsumer<>(OffsetResetStrategy.NONE); + consumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()); stateManager = new GlobalStateManagerImpl( new LogContext("test"), time, @@ -579,7 +579,7 @@ private Map readOffsetsCheckpoint() throws IOException { @Test public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map endOffsets(final Collection partitions) { numberOfCalls.incrementAndGet(); @@ -621,7 +621,7 @@ public synchronized Map endOffsets(final Collection(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map endOffsets(final Collection partitions) { time.sleep(100L); @@ -662,7 +662,7 @@ public synchronized Map endOffsets(final Collection(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map endOffsets(final Collection partitions) { time.sleep(100L); @@ -703,7 +703,7 @@ public synchronized Map endOffsets(final Collection(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized Map endOffsets(final Collection partitions) { time.sleep(1L); @@ -745,7 +745,7 @@ public synchronized long position(final TopicPartition partition) { @Test public void shouldNotRetryWhenPartitionsForThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List partitionsFor(final String topic) { numberOfCalls.incrementAndGet(); @@ -787,7 +787,7 @@ public List partitionsFor(final String topic) { @Test public void shouldRetryAtLeastOnceWhenPartitionsForThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List partitionsFor(final String topic) { time.sleep(100L); @@ -828,7 +828,7 @@ public List partitionsFor(final String topic) { @Test public void shouldRetryWhenPartitionsForThrowsTimeoutExceptionUntilTaskTimeoutExpires() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List partitionsFor(final String topic) { time.sleep(100L); @@ -869,7 +869,7 @@ public List partitionsFor(final String topic) { @Test public void shouldNotFailOnSlowProgressWhenPartitionForThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List partitionsFor(final String topic) { time.sleep(1L); @@ -911,7 +911,7 @@ public synchronized long position(final TopicPartition partition) { @Test public void shouldNotRetryWhenPositionThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { numberOfCalls.incrementAndGet(); @@ -953,7 +953,7 @@ public synchronized long position(final TopicPartition partition) { @Test public void shouldRetryAtLeastOnceWhenPositionThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { time.sleep(100L); @@ -994,7 +994,7 @@ public synchronized long position(final TopicPartition partition) { @Test public void shouldRetryWhenPositionThrowsTimeoutExceptionUntilTaskTimeoutExpired() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { time.sleep(100L); @@ -1035,7 +1035,7 @@ public synchronized long position(final TopicPartition partition) { @Test public void shouldNotFailOnSlowProgressWhenPositionThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized long position(final TopicPartition partition) { time.sleep(1L); @@ -1071,7 +1071,7 @@ public synchronized long position(final TopicPartition partition) { @Test public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() { - consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public synchronized ConsumerRecords poll(final Duration timeout) { time.sleep(timeout.toMillis()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index e6b409fed4c9e..244a246bd2034 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -72,7 +72,7 @@ public class GlobalStreamThreadTest { private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); - private final MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE); + private final MockConsumer mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()); private final MockTime time = new MockTime(); private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private GlobalStreamThread globalStreamThread; @@ -160,7 +160,7 @@ public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() thr @Test public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception { - final MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer mockConsumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public List partitionsFor(final String topic) { throw new RuntimeException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index a0c119c6294f1..9d46569c27b74 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.Serde; @@ -106,8 +106,8 @@ public void shouldAddSourceWithOffsetReset() { builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic); builder.initializeSubscription(); - assertThat(builder.offsetResetStrategy(earliestTopic), equalTo(OffsetResetStrategy.EARLIEST)); - assertThat(builder.offsetResetStrategy(latestTopic), equalTo(OffsetResetStrategy.LATEST)); + assertThat(builder.offsetResetStrategy(earliestTopic), equalTo(AutoOffsetResetStrategy.EARLIEST)); + assertThat(builder.offsetResetStrategy(latestTopic), equalTo(AutoOffsetResetStrategy.LATEST)); } @Test @@ -119,8 +119,8 @@ public void shouldAddSourcePatternWithOffsetReset() { builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, Pattern.compile(latestTopicPattern)); builder.initializeSubscription(); - assertThat(builder.offsetResetStrategy("earliestTestTopic"), equalTo(OffsetResetStrategy.EARLIEST)); - assertThat(builder.offsetResetStrategy("latestTestTopic"), equalTo(OffsetResetStrategy.LATEST)); + assertThat(builder.offsetResetStrategy("earliestTestTopic"), equalTo(AutoOffsetResetStrategy.EARLIEST)); + assertThat(builder.offsetResetStrategy("latestTestTopic"), equalTo(AutoOffsetResetStrategy.LATEST)); } @Test @@ -130,7 +130,7 @@ public void shouldAddSourceWithoutOffsetReset() { assertEquals(Collections.singletonList("test-topic"), builder.fullSourceTopicNames()); - assertThat(builder.offsetResetStrategy("test-topic"), equalTo(OffsetResetStrategy.NONE)); + assertThat(builder.offsetResetStrategy("test-topic"), equalTo(AutoOffsetResetStrategy.NONE)); } @Test @@ -142,7 +142,7 @@ public void shouldAddPatternSourceWithoutOffsetReset() { assertThat(expectedPattern.pattern(), builder.sourceTopicPatternString(), equalTo("test-.*")); - assertThat(builder.offsetResetStrategy("test-topic"), equalTo(OffsetResetStrategy.NONE)); + assertThat(builder.offsetResetStrategy("test-topic"), equalTo(AutoOffsetResetStrategy.NONE)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 11271c8471afe..afe01ff433c69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; @@ -39,7 +39,7 @@ public class StateConsumerTest { private final TopicPartition topicOne = new TopicPartition("topic-one", 1); private final TopicPartition topicTwo = new TopicPartition("topic-two", 1); - private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); private final Map partitionOffsets = new HashMap<>(); private final LogContext logContext = new LogContext("test "); private GlobalStreamThread.StateConsumer stateConsumer; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 0e85e392e3188..7d3167bf32e6c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -140,7 +140,7 @@ public void onRestoreEnd(final TopicPartition tp, final String store, final long } }; - private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); private final MockAdminClient adminClient = new MockAdminClient(); private final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener); @@ -389,7 +389,7 @@ public void shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutExc adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public long position(final TopicPartition partition) { throw new TimeoutException("KABOOM!"); @@ -674,7 +674,7 @@ public void shouldRequestPositionAndHandleTimeoutException() { when(activeStateManager.taskId()).thenReturn(taskId); final AtomicBoolean clearException = new AtomicBoolean(false); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public long position(final TopicPartition partition) { if (clearException.get()) { @@ -720,7 +720,7 @@ public void shouldThrowIfPositionFail() { when(activeStateManager.taskId()).thenReturn(taskId); when(storeMetadata.offset()).thenReturn(10L); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public long position(final TopicPartition partition) { throw kaboom; @@ -770,7 +770,7 @@ public ListOffsetsResult listOffsets(final Map topic }; adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public Map committed(final Set partitions) { throw new AssertionError("Should not trigger this function"); @@ -928,7 +928,7 @@ public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(fina @Test public void shouldThrowIfUnsubscribeFail() { - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public void unsubscribe() { throw kaboom; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index b36f5e41e47db..8555e4d065b24 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -189,7 +189,7 @@ public void process(final Record record) { private final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, false); private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 1); - private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); private final String threadId = Thread.currentThread().getName(); @@ -440,7 +440,7 @@ public void shouldAutoOffsetResetIfNoCommittedOffsetFound() { task.addPartitionsForOffsetReset(Collections.singleton(partition1)); final AtomicReference shouldNotSeek = new AtomicReference<>(); - try (final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST) { + try (final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public void seek(final TopicPartition partition, final long offset) { final AssertionError error = shouldNotSeek.get(); @@ -1965,7 +1965,7 @@ public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final boolean public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final Consumer consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public Map committed(final Set partitions) { throw new KafkaException("KABOOM!"); @@ -3029,7 +3029,7 @@ private StreamTask createDisconnectedTask(final StreamsConfig config) { singletonList(stateStore), emptyMap()); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer consumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name()) { @Override public Map committed(final Set partitions) { throw new TimeoutException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7af603b3239c1..8a3e6c17d8a19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; @@ -1393,8 +1393,8 @@ public void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabl final InternalTopologyBuilder internalTopologyBuilder = mock(InternalTopologyBuilder.class); when(internalTopologyBuilder.fullSourceTopicNames()).thenReturn(Collections.singletonList(topic1)); - final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.LATEST.name()); + final MockConsumer restoreConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); consumer.subscribe(Collections.singletonList(topic1), new MockRebalanceListener()); consumer.rebalance(Collections.singletonList(t1p1)); @@ -2581,7 +2581,7 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean state final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = mock(TaskManager.class); - final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.LATEST.name()); consumer.assign(assignedPartitions); consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L)); @@ -2611,7 +2611,7 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean sta final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = mock(TaskManager.class); - final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.LATEST.name()); consumer.assign(assignedPartitions); consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index d1cdf53c3e4f1..212a65eacddf9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.PartitionInfo; @@ -165,8 +165,8 @@ public void before() { properties.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath()); final StreamsConfig streamsConfig = new StreamsConfig(properties); - final MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - final MockConsumer mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final MockConsumer mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); + final MockConsumer mockRestoreConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); final MockProducer mockProducer = new MockProducer<>(); final MockAdminClient mockAdminClient = MockAdminClient.create().build(); configureClients(mockRestoreConsumer, mockAdminClient, "applicationId-kv-store-changelog"); diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 84ad3e3f80071..37723f1e7cc47 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -20,7 +20,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -45,8 +45,8 @@ public class MockClientSupplier implements KafkaClientSupplier { public MockAdminClient adminClient = new MockAdminClient(); private final List> preparedProducers = new LinkedList<>(); public final List> producers = new LinkedList<>(); - public final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - public final MockConsumer restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + public final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); + public final MockConsumer restoreConsumer = new MockConsumer<>(AutoOffsetResetStrategy.LATEST.name()); public void setApplicationIdForProducer(final String applicationId) { this.applicationId = applicationId; diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java index 345a5824a20e3..cb59b642db047 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; @@ -42,7 +42,7 @@ public final class MockRestoreConsumer extends MockConsumer> recordBuffer = new ArrayList<>(); public MockRestoreConsumer(final Serializer keySerializer, final Serializer valueSerializer) { - super(OffsetResetStrategy.EARLIEST); + super(AutoOffsetResetStrategy.EARLIEST.name()); reset(); this.keySerializer = keySerializer; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index cbf314e4426bb..525e01f151cec 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -335,7 +335,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, streamsMetrics ); - consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); final Serializer bytesSerializer = new ByteArraySerializer(); producer = new MockProducer<>(true, bytesSerializer, bytesSerializer) { @Override @@ -411,7 +411,7 @@ private void setupGlobalTask(final Time mockWallClockTime, final StreamsMetricsImpl streamsMetrics, final ThreadCache cache) { if (globalTopology != null) { - final MockConsumer globalConsumer = new MockConsumer<>(OffsetResetStrategy.NONE); + final MockConsumer globalConsumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()); for (final String topicName : globalTopology.sourceTopics()) { final TopicPartition partition = new TopicPartition(topicName, 0); globalPartitionsByInputTopic.put(topicName, partition); diff --git a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java index b03835aec9e3c..f8dbd687dfd2d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -47,7 +47,7 @@ public class StreamsResetterTest { private static final String TOPIC = "topic1"; private final StreamsResetter streamsResetter = new StreamsResetter(); - private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); private final TopicPartition topicPartition = new TopicPartition(TOPIC, 0); private final Set inputTopicPartitions = new HashSet<>(Collections.singletonList(topicPartition)); @@ -81,7 +81,7 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { public void testResetOffsetToSpecificOffsetWhenAfterEndOffset() { final long beginningOffset = 5L; final long endOffset = 10L; - final MockConsumer emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final MockConsumer emptyConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); emptyConsumer.assign(Collections.singletonList(topicPartition)); final Map beginningOffsetsMap = new HashMap<>(); @@ -273,7 +273,7 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() { @Test public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() { final long beginningAndEndOffset = 5L; // Empty partition implies beginning offset == end offset - final MockConsumer emptyConsumer = new EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST); + final MockConsumer emptyConsumer = new EmptyPartitionConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); emptyConsumer.assign(Collections.singletonList(topicPartition)); final Map beginningOffsetsMap = new HashMap<>(); @@ -304,7 +304,7 @@ private Cluster createCluster(final int numNodes) { } private static class EmptyPartitionConsumer extends MockConsumer { - public EmptyPartitionConsumer(final OffsetResetStrategy offsetResetStrategy) { + public EmptyPartitionConsumer(final String offsetResetStrategy) { super(offsetResetStrategy); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java index 03fffa2f236cb..fa3f401784389 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java @@ -23,8 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -143,7 +143,7 @@ public void shouldResetUnConsumedOffsetsBeforeExit() throws IOException { int totalMessages = 700; long startOffset = 0L; - MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); TopicPartition tp1 = new TopicPartition(topic, 0); TopicPartition tp2 = new TopicPartition(topic, 1);