Skip to content

Commit

Permalink
Add exception.on.invalid.offset.reset config to allow users handle In…
Browse files Browse the repository at this point in the history
…validOffsetException (#182)

Co-authored-by: Ke Hu <kehu@kehu-mn1.linkedin.biz>
  • Loading branch information
kehuum and Ke Hu authored Aug 3, 2020
1 parent b67a80d commit 7e4a138
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -1385,6 +1386,38 @@ public static Object[][] offsetResetStrategies() {
};
}

@Test
public void testThrowExceptionOnInvalidOffset() throws Exception {
createTopic(TOPIC1);
createTopic(TOPIC2);
produceRecordsWithKafkaProducer();
Properties props = new Properties();
props.setProperty("auto.offset.reset", "liclosest");
props.setProperty("group.id", "testThrowExceptionOnInvalidOffset");
props.setProperty("exception.on.invalid.offset.reset", "true");

TopicPartition tp = new TopicPartition(TOPIC1, 0);
try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
consumer.assign(Collections.singleton(tp));
ConsumerRecords<String, String> consumerRecords = ConsumerRecords.empty();
consumer.seek(tp, 0);
Long endPollTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5);
while (consumerRecords.isEmpty() && (System.currentTimeMillis() < endPollTime)) {
consumerRecords = consumer.poll(Duration.ofMillis(1000));
}
consumer.seek(tp, 100000L);
assertEquals(consumer.position(tp), 100000L);

boolean exceptionThrown = false;
try {
consumer.poll(Duration.ofMillis(1000));
} catch (InvalidOffsetException ee) {
exceptionThrown = true;
}
assertTrue(exceptionThrown);
}
}

@Test(dataProvider = "offsetResetStrategies")
public void testOffsetOutOfRangeForStrategy(LiOffsetResetStrategy strategy) throws Exception {
createTopic(TOPIC1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
public static final String MESSAGE_ASSEMBLER_EXPIRATION_OFFSET_GAP_CONFIG = "message.assembler.expiration.offset.gap";
public static final String MAX_TRACKED_MESSAGES_PER_PARTITION_CONFIG = "max.tracked.messages.per.partition";
public static final String EXCEPTION_ON_MESSAGE_DROPPED_CONFIG = "exception.on.message.dropped";
public static final String EXCEPTION_ON_INVALID_OFFSET_RESET_CONFIG = "exception.on.invalid.offset.reset";
public static final String TREAT_BAD_SEGMENTS_AS_PAYLOAD_CONFIG = "treat.bad.segments.as.payload";
public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
Expand Down Expand Up @@ -72,6 +73,12 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
"full or the incomplete message has expired. The consumer will throw a LargeMessageDroppedException if this " +
"configuration is set to true. Otherwise the consumer will drop the message silently.";

private static final String EXCEPTION_ON_INVALID_OFFSET_RESET_DOC = "If true, even when auto.offset.reset is set to " +
"earliest/latest/liclosest, InvalidOffsetException will be thrown to user upon calling to poll. This is to allow " +
"users to properly invoke user callback upon InvalidOffsetException while handling the reset and corresponding " +
"large message related internal state correctly at LiKafkaConsumerImpl. In the future when large message is decoupled " +
"from LiKafkaConsumerImpl, this config is not needed anymore";

private static final String TREAT_BAD_SEGMENTS_AS_PAYLOAD_DOC = "The message assembler will treat invalid message segments " +
" as payload. this can be used as a last resort when some arbitrary payloads accidentally pass as a large message segment";

Expand Down Expand Up @@ -132,11 +139,16 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
"false",
Importance.LOW,
EXCEPTION_ON_MESSAGE_DROPPED_DOC)
.define(EXCEPTION_ON_INVALID_OFFSET_RESET_CONFIG,
Type.BOOLEAN,
"false",
Importance.LOW,
EXCEPTION_ON_INVALID_OFFSET_RESET_DOC)
.define(TREAT_BAD_SEGMENTS_AS_PAYLOAD_CONFIG,
Type.BOOLEAN,
Type.BOOLEAN,
"false",
Importance.LOW,
TREAT_BAD_SEGMENTS_AS_PAYLOAD_DOC)
Importance.LOW,
TREAT_BAD_SEGMENTS_AS_PAYLOAD_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG,
Type.CLASS,
ByteArrayDeserializer.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class LiKafkaConsumerImpl<K, V> implements LiKafkaConsumer<K, V> {
private final LiKafkaOffsetCommitCallback _offsetCommitCallback;
private final boolean _autoCommitEnabled;
private final long _autoCommitInterval;
private final boolean _throwExceptionOnInvalidOffsets;
private final LiOffsetResetStrategy _offsetResetStrategy;
private long _lastAutoCommitMs;
private final Map<MetricName, Metric> _extraMetrics = new HashMap<>(2);
Expand Down Expand Up @@ -116,6 +117,7 @@ private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,

_autoCommitEnabled = configs.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
_autoCommitInterval = configs.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
_throwExceptionOnInvalidOffsets = configs.getBoolean(LiKafkaConsumerConfig.EXCEPTION_ON_INVALID_OFFSET_RESET_CONFIG);
_offsetResetStrategy =
LiOffsetResetStrategy.valueOf(configs.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
_lastAutoCommitMs = System.currentTimeMillis();
Expand Down Expand Up @@ -319,6 +321,11 @@ private ConsumerRecords<K, V> poll(long timeout, boolean includeMetadataInTimeou
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
handleInvalidOffsetException(oe);

// force throw exception if exception.on.invalid.offset.reset is set to true
if (_throwExceptionOnInvalidOffsets) {
throw oe;
}
}

_lastProcessedResult = _consumerRecordsProcessor.process(rawRecords);
Expand All @@ -336,6 +343,7 @@ private ConsumerRecords<K, V> poll(long timeout, boolean includeMetadataInTimeou
throw crpe;
}
}

now = System.currentTimeMillis();
} while (processedRecords.isEmpty() && now < deadline);
return processedRecords;
Expand Down

0 comments on commit 7e4a138

Please sign in to comment.