Skip to content

Commit

Permalink
KAFKA-17990: Deflake testShareAutoOffsetResetDefaultValue (apache#17916)
Browse files Browse the repository at this point in the history
ShareConsumerTest.testShareAutoOffsetResetDefaultValue has been tightened up by making sure that records produced have been flushed before starting consumption. A possible but unlikely race condition seems the source of the flakiness and this should now be eliminated in the previous PR to this test case.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
  • Loading branch information
AndrewJSchofield authored Nov 26, 2024
1 parent 5480d54 commit 48d60ef
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions core/src/test/java/kafka/test/api/ShareConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record2).get();
producer.flush();
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));

shareConsumer.subscribe(Collections.singleton(tp.topic()));

Expand Down Expand Up @@ -328,7 +328,7 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe
producer.send(record);
producer.flush();

shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));

TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
Expand Down Expand Up @@ -356,7 +356,7 @@ public void testAcknowledgementCommitCallbackOnClose(String persister) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
Expand Down Expand Up @@ -388,7 +388,7 @@ public void testAcknowledgementCommitCallbackInvalidRecordStateException(String
producer.send(record);
producer.flush();

shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
Expand All @@ -404,12 +404,12 @@ public void testAcknowledgementCommitCallbackInvalidRecordStateException(String
}
}

private static class TestableAcknowledgeCommitCallback implements AcknowledgementCommitCallback {
private static class TestableAcknowledgementCommitCallback implements AcknowledgementCommitCallback {
private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
private final Map<TopicPartition, Exception> partitionExceptionMap;

public TestableAcknowledgeCommitCallback(Map<TopicPartition, Set<Long>> partitionOffsetsMap,
Map<TopicPartition, Exception> partitionExceptionMap) {
public TestableAcknowledgementCommitCallback(Map<TopicPartition, Set<Long>> partitionOffsetsMap,
Map<TopicPartition, Exception> partitionExceptionMap) {
this.partitionOffsetsMap = partitionOffsetsMap;
this.partitionExceptionMap = partitionExceptionMap;
}
Expand Down Expand Up @@ -623,7 +623,7 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte

Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>();
shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));
shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));

ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000));
assertEquals(3, records.count());
Expand Down Expand Up @@ -678,7 +678,7 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister)

Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));

ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000));
assertEquals(3, records.count());
Expand Down Expand Up @@ -870,7 +870,7 @@ public void testImplicitAcknowledgementCommitAsync(String persister) throws Inte
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>();

shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(3, records.count());
Expand Down Expand Up @@ -1328,11 +1328,11 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr

/**
* Test to verify that the acknowledgement commit callback cannot invoke methods of KafkaShareConsumer.
* The exception thrown is verified in {@link TestableAcknowledgeCommitCallbackWithShareConsumer}
* The exception thrown is verified in {@link TestableAcknowledgementCommitCallbackWithShareConsumer}
*/
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String persister) {
public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String persister) {
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) {
Expand All @@ -1341,7 +1341,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String per
producer.send(record);
producer.flush();

shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer));
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));

// The acknowledgment commit callback will try to call a method of KafkaShareConsumer
Expand All @@ -1353,10 +1353,10 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String per
}
}

private class TestableAcknowledgeCommitCallbackWithShareConsumer<K, V> implements AcknowledgementCommitCallback {
private class TestableAcknowledgementCommitCallbackWithShareConsumer<K, V> implements AcknowledgementCommitCallback {
private final KafkaShareConsumer<K, V> shareConsumer;

TestableAcknowledgeCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V> shareConsumer) {
TestableAcknowledgementCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V> shareConsumer) {
this.shareConsumer = shareConsumer;
}

Expand All @@ -1376,7 +1376,7 @@ public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception ex
@Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persister) throws InterruptedException {
public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String persister) throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) {
Expand All @@ -1386,7 +1386,7 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persist
producer.flush();

// The acknowledgment commit callback will try to call a method of KafkaShareConsumer
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer));
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));

TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
Expand All @@ -1409,10 +1409,10 @@ public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String persist
}
}

private static class TestableAcknowledgeCommitCallbackWakeup<K, V> implements AcknowledgementCommitCallback {
private static class TestableAcknowledgementCommitCallbackWakeup<K, V> implements AcknowledgementCommitCallback {
private final KafkaShareConsumer<K, V> shareConsumer;

TestableAcknowledgeCommitCallbackWakeup(KafkaShareConsumer<K, V> shareConsumer) {
TestableAcknowledgementCommitCallbackWakeup(KafkaShareConsumer<K, V> shareConsumer) {
this.shareConsumer = shareConsumer;
}

Expand All @@ -1429,7 +1429,7 @@ public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception ex
@Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testAcknowledgeCommitCallbackThrowsException(String persister) throws InterruptedException {
public void testAcknowledgementCommitCallbackThrowsException(String persister) throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) {
Expand All @@ -1438,7 +1438,7 @@ public void testAcknowledgeCommitCallbackThrowsException(String persister) throw
producer.send(record);
producer.flush();

shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackThrows<>());
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackThrows<>());
shareConsumer.subscribe(Collections.singleton(tp.topic()));

TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
Expand All @@ -1456,10 +1456,10 @@ public void testAcknowledgeCommitCallbackThrowsException(String persister) throw
}
}

private static class TestableAcknowledgeCommitCallbackThrows<K, V> implements AcknowledgementCommitCallback {
private static class TestableAcknowledgementCommitCallbackThrows<K, V> implements AcknowledgementCommitCallback {
@Override
public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) {
throw new org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in TestableAcknowledgeCommitCallbackThrows.onComplete");
throw new org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in TestableAcknowledgementCommitCallbackThrows.onComplete");
}
}

Expand Down Expand Up @@ -1673,7 +1673,6 @@ public void testLsoMovementByRecordsDeletion(String persister) {
}
}

@Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetDefaultValue(String persister) {
Expand Down

0 comments on commit 48d60ef

Please sign in to comment.