diff --git a/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt b/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt index 263ee6c..5aee7b2 100644 --- a/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt +++ b/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt @@ -10,7 +10,6 @@ import org.apache.kafka.clients.consumer.* import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.* -import org.apache.kafka.common.record.TimestampType import org.intellij.lang.annotations.Language import org.slf4j.LoggerFactory import java.time.Duration @@ -97,7 +96,6 @@ class KafkaRapid( override fun onPartitionsRevoked(partitions: Collection) { log.info("partitions revoked: $partitions") - partitions.forEach { it.commitSync() } notifyNotReady() } @@ -124,7 +122,9 @@ class KafkaRapid( currentPositions.forEach { (partition, offset) -> consumer.seek(partition, offset) } throw err } finally { - consumer.commitSync(currentPositions.mapValues { (_, offset) -> offsetMetadata(offset) }) + val offsetsToBeCommitted = currentPositions.mapValues { (_, offset) -> offsetMetadata(offset) } + log.info("committing offsets ${offsetsToBeCommitted.entries.joinToString { "topic=${it.key.topic()}, parition=${it.key.partition()} offset=${it.value.offset()}" }}") + consumer.commitSync(offsetsToBeCommitted) } } @@ -186,13 +186,6 @@ class KafkaRapid( "rapids_record_offset" to "${record.offset()}" ) - private fun TopicPartition.commitSync() { - if (autoCommit) return - val offset = consumer.position(this) - log.info("committing offset offset=$offset for partition=$this") - consumer.commitSync(mapOf(this to offsetMetadata(offset))) - } - private fun offsetMetadata(offset: Long): OffsetAndMetadata { val clientId = consumer.groupMetadata().groupInstanceId().map { "\"$it\"" }.orElse("null") diff --git a/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt b/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt index 91ca52a..a815d24 100644 --- a/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt +++ b/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt @@ -26,6 +26,8 @@ import java.util.* import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.atomic.AtomicBoolean import kotlin.random.Random +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS internal class RapidIntegrationTest { private companion object { @@ -186,6 +188,80 @@ internal class RapidIntegrationTest { assertDoesNotThrow { LocalDateTime.parse(objectMapper.readTree(metadata).path("time").asText()) } } + @Test + fun `in case of shutdown, the offset committed is the next record to be processed`() = rapidE2E { + ensureRapidIsActive() + + // stop rapid so we can queue up records + rapid.stop() + it.cancelAndJoin() + + val offsets = (0..100).map { + val key = UUID.randomUUID().toString() + mainTopic.send(key, "{\"test_message_index\": $it}").get() + } + .also { + assertEquals(1, it.distinctBy { it.partition() }.size) { "testen forutsetter én unik partisjon" } + } + .map { it.offset() } + + val stopProcessingOnMessage = 5 + val expectedOffset = offsets[stopProcessingOnMessage] + 1 + val synchronizationBarrier = Channel(RENDEZVOUS) + val rapid = createTestRapid(consumerGroupId, mainTopic, extraTopic) + + River(rapid) + .validate { it.requireKey("test_message_index") } + .onSuccess { packet: JsonMessage, _: MessageContext, _, _ -> + val index = packet["test_message_index"].asInt() + println("Read test_message_index=$index") + if (index == stopProcessingOnMessage) { + // notify test that we are ready to go forward + runBlocking { synchronizationBarrier.send(true) } + + // wait until test has signalled that shutdown has been started + await("wait until test is ready to go forward") + .atMost(20, SECONDS) + .until { runBlocking { synchronizationBarrier.receive() } } + } + } + + try { + runBlocking { + val rapidJob = launch(Dispatchers.IO) { rapid.start() } + + await("wait until test is ready to go forward") + .atMost(20, SECONDS) + .until { runBlocking { synchronizationBarrier.receive() } } + + rapid.stop() + synchronizationBarrier.send(true) + } + } catch (err: RuntimeException) { + assertEquals("an unexpected error happened", err.message) + } finally { + rapid.stop() + } + + await("wait until the rapid stops") + .atMost(20, SECONDS) + .until { !rapid.isRunning() } + + val actualOffset = await().atMost(Duration.ofSeconds(5)).until({ + val offsets = mainTopic.adminClient + .listConsumerGroupOffsets(consumerGroupId) + ?.partitionsToOffsetAndMetadata() + ?.get() + ?: fail { "was not able to fetch committed offset for consumer $consumerGroupId" } + offsets[TopicPartition(mainTopic.topicnavn, 0)] + }) { it != null } + + val metadata = actualOffset?.metadata() ?: fail { "expected metadata to be present in OffsetAndMetadata" } + assertEquals(expectedOffset, actualOffset.offset()) + assertTrue(objectMapper.readTree(metadata).has("groupInstanceId")) + assertDoesNotThrow { LocalDateTime.parse(objectMapper.readTree(metadata).path("time").asText()) } + } + private fun TestContext.ensureRapidIsActive() { val readMessages = mutableListOf() River(rapid).onSuccess { packet: JsonMessage, _: MessageContext, _, _ -> readMessages.add(packet) } @@ -318,4 +394,4 @@ private class LocalKafkaConfig(private val connectionProperties: Properties) : C putAll(connectionProperties) } } -} \ No newline at end of file +}