Skip to content

Commit

Permalink
KAFKA-18013: Add AutoOffsetResetStrategy internal class (apache#17858)
Browse files Browse the repository at this point in the history
- Deprecates OffsetResetStrategy enum
- Adds new internal class AutoOffsetResetStrategy
- Replaces all OffsetResetStrategy enum usages with AutoOffsetResetStrategy
- Deprecate old/Add new constructors to MockConsumer

 Reviewers: Andrew Schofield <aschofield@confluent.io>, Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
omkreddy authored Nov 25, 2024
1 parent 619ef63 commit 3268435
Show file tree
Hide file tree
Showing 54 changed files with 500 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -506,8 +507,8 @@ public class ConsumerConfig extends AbstractConfig {
ENABLE_METRICS_PUSH_DOC)
.define(AUTO_OFFSET_RESET_CONFIG,
Type.STRING,
OffsetResetStrategy.LATEST.toString(),
in(Utils.enumOptions(OffsetResetStrategy.class)),
AutoOffsetResetStrategy.LATEST.name(),
new AutoOffsetResetStrategy.Validator(),
Importance.MEDIUM,
AUTO_OFFSET_RESET_DOC)
.define(CHECK_CRCS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
Expand Down Expand Up @@ -79,7 +80,23 @@ public class MockConsumer<K, V> implements Consumer<K, V> {

private final List<KafkaMetric> addedMetrics = new ArrayList<>();

/**
* @deprecated Since 4.0. Use {@link #MockConsumer(String)}.
*/
@Deprecated
public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this(AutoOffsetResetStrategy.fromString(offsetResetStrategy.toString()));
}

/**
* A mock consumer is instantiated by providing ConsumerConfig.AUTO_OFFSET_RESET_CONFIG value as the input.
* @param offsetResetStrategy the offset reset strategy to use
*/
public MockConsumer(String offsetResetStrategy) {
this(AutoOffsetResetStrategy.fromString(offsetResetStrategy));
}

private MockConsumer(AutoOffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
this.partitions = new HashMap<>();
this.records = new HashMap<>();
Expand Down Expand Up @@ -389,7 +406,7 @@ public synchronized long position(TopicPartition partition, final Duration timeo
@Override
public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
ensureNotClosed();
subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.EARLIEST);
subscriptions.requestOffsetReset(partitions, AutoOffsetResetStrategy.EARLIEST);
}

public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
Expand All @@ -399,7 +416,7 @@ public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOff
@Override
public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
ensureNotClosed();
subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST);
subscriptions.requestOffsetReset(partitions, AutoOffsetResetStrategy.LATEST);
}

public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
Expand Down Expand Up @@ -573,13 +590,13 @@ private void updateFetchPosition(TopicPartition tp) {
}

private void resetOffsetPosition(TopicPartition tp) {
OffsetResetStrategy strategy = subscriptions.resetStrategy(tp);
AutoOffsetResetStrategy strategy = subscriptions.resetStrategy(tp);
Long offset;
if (strategy == OffsetResetStrategy.EARLIEST) {
if (strategy == AutoOffsetResetStrategy.EARLIEST) {
offset = beginningOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
} else if (strategy == OffsetResetStrategy.LATEST) {
} else if (strategy == AutoOffsetResetStrategy.LATEST) {
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class MockShareConsumer<K, V> implements ShareConsumer<K, V> {
private Uuid clientInstanceId;

public MockShareConsumer() {
this.subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
this.subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
this.records = new HashMap<>();
this.closed = false;
this.wakeup = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import java.util.Locale;

/**
* @deprecated Since 4.0. Use {@link org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy instead.}
*/
@Deprecated
public enum OffsetResetStrategy {
LATEST, EARLIEST, NONE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
Expand Down Expand Up @@ -864,15 +863,15 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)

@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
seek(partitions, OffsetResetStrategy.EARLIEST);
seek(partitions, AutoOffsetResetStrategy.EARLIEST);
}

@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
seek(partitions, OffsetResetStrategy.LATEST);
seek(partitions, AutoOffsetResetStrategy.LATEST);
}

private void seek(Collection<TopicPartition> partitions, OffsetResetStrategy offsetResetStrategy) {
private void seek(Collection<TopicPartition> partitions, AutoOffsetResetStrategy offsetResetStrategy) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot be null");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;

import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;

public class AutoOffsetResetStrategy {
private enum StrategyType {
LATEST, EARLIEST, NONE;

@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}

public static final AutoOffsetResetStrategy EARLIEST = new AutoOffsetResetStrategy(StrategyType.EARLIEST);
public static final AutoOffsetResetStrategy LATEST = new AutoOffsetResetStrategy(StrategyType.LATEST);
public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(StrategyType.NONE);

private final StrategyType type;

private AutoOffsetResetStrategy(StrategyType type) {
this.type = type;
}

public static boolean isValid(String offsetStrategy) {
return Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
}

public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
if (offsetStrategy == null || !isValid(offsetStrategy)) {
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
switch (type) {
case EARLIEST:
return EARLIEST;
case LATEST:
return LATEST;
case NONE:
return NONE;
default:
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}
}

/**
* Returns the name of the offset reset strategy.
*/
public String name() {
return type.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
return Objects.equals(type, that.type);
}

@Override
public int hashCode() {
return Objects.hashCode(type);
}

@Override
public String toString() {
return "AutoOffsetResetStrategy{" +
"type='" + type + '\'' +
'}';
}

public static class Validator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
String strategy = (String) value;
if (!AutoOffsetResetStrategy.isValid(strategy)) {
throw new ConfigException(name, value, "Invalid value " + strategy + " for configuration " +
name + ": the value must be either 'earliest', 'latest', or 'none'.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -828,7 +827,7 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.isEmpty() ? this.subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
subscriptions.requestOffsetReset(parts, AutoOffsetResetStrategy.EARLIEST);
} finally {
release();
}
Expand All @@ -842,7 +841,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.isEmpty() ? this.subscriptions.assignedPartitions() : partitions;
subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
subscriptions.requestOffsetReset(parts, AutoOffsetResetStrategy.LATEST);
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -130,8 +129,8 @@ public static IsolationLevel configuredIsolationLevel(ConsumerConfig config) {
}

public static SubscriptionState createSubscriptionState(ConsumerConfig config, LogContext logContext) {
String s = config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT);
OffsetResetStrategy strategy = OffsetResetStrategy.valueOf(s);
String s = config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
AutoOffsetResetStrategy strategy = AutoOffsetResetStrategy.fromString(s);
return new SubscriptionState(logContext, strategy);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -285,10 +284,10 @@ static Map<TopicPartition, OffsetAndTimestampInternal> buildOffsetsForTimeIntern
}

private long offsetResetStrategyTimestamp(final TopicPartition partition) {
OffsetResetStrategy strategy = subscriptionState.resetStrategy(partition);
if (strategy == OffsetResetStrategy.EARLIEST)
AutoOffsetResetStrategy strategy = subscriptionState.resetStrategy(partition);
if (strategy == AutoOffsetResetStrategy.EARLIEST)
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
else if (strategy == OffsetResetStrategy.LATEST)
else if (strategy == AutoOffsetResetStrategy.LATEST)
return ListOffsetsRequest.LATEST_TIMESTAMP;
else
throw new NoOffsetForPartitionException(partition);
Expand Down Expand Up @@ -320,11 +319,11 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
}
}

static OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
static AutoOffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
return OffsetResetStrategy.EARLIEST;
return AutoOffsetResetStrategy.EARLIEST;
else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
return OffsetResetStrategy.LATEST;
return AutoOffsetResetStrategy.LATEST;
else
return null;
}
Expand Down Expand Up @@ -411,7 +410,7 @@ private LogTruncationException buildLogTruncationException(List<SubscriptionStat
}

// Visible for testing
void resetPositionIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy,
void resetPositionIfNeeded(TopicPartition partition, AutoOffsetResetStrategy requestedResetStrategy,
ListOffsetData offsetData) {
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
offsetData.offset,
Expand Down
Loading

0 comments on commit 3268435

Please sign in to comment.