Skip to content

Commit

Permalink
committer offsets bare ved håndtering av meldinger
Browse files Browse the repository at this point in the history
! r&r har i noen tilfeller committed over meldinger!

feilen ble introdusert i 105481e.

1) podden poller, for eksempel, 500 meldinger
2) mens podden har prosessert, si 10 meldinger, så settes `running` til `false`
3) finally-blokken vil (korrekt) committe offset til `forrige prosesserte melding + 1`
4) `consumer.position()` er dog uforandret, og vil peke til `siste melding fra poll + 1`
5) i `onPartitionsRevoked` committer vi offset basert på `consumer.position()`, og vi hopper i praksis over meldinger

Alternativ løsning kunne vært å kalle på `consumer.seek()` i `finally`, men vi fjerner heller committing av offsets i `onPartitionsRevoked` da vi ikke lenger ser et behov for bevare det.

Co-authored-by: Christian Skovborg Gule <christian.skovborg.gule@nav.no>
Co-authored-by: Jakob Havstein Eriksen <jakob.havstein.eriksen@nav.no>
Co-authored-by: Svein Elgstøen <svein.elgstoen@nav.no>
  • Loading branch information
4 people committed Jan 27, 2025
1 parent 1bd9d4a commit 17a2772
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.apache.kafka.clients.consumer.*
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.*
import org.apache.kafka.common.record.TimestampType
import org.intellij.lang.annotations.Language
import org.slf4j.LoggerFactory
import java.time.Duration
Expand Down Expand Up @@ -97,7 +96,6 @@ class KafkaRapid(

override fun onPartitionsRevoked(partitions: Collection<TopicPartition>) {
log.info("partitions revoked: $partitions")
partitions.forEach { it.commitSync() }
notifyNotReady()
}

Expand All @@ -124,7 +122,9 @@ class KafkaRapid(
currentPositions.forEach { (partition, offset) -> consumer.seek(partition, offset) }
throw err
} finally {
consumer.commitSync(currentPositions.mapValues { (_, offset) -> offsetMetadata(offset) })
val offsetsToBeCommitted = currentPositions.mapValues<TopicPartition, Long, OffsetAndMetadata> { (_, offset) -> offsetMetadata(offset) }
log.info("committing offsets ${offsetsToBeCommitted.entries.joinToString { "topic=${it.key.topic()}, parition=${it.key.partition()} offset=${it.value.offset()}" }}")
consumer.commitSync(offsetsToBeCommitted)
}
}

Expand Down Expand Up @@ -186,13 +186,6 @@ class KafkaRapid(
"rapids_record_offset" to "${record.offset()}"
)

private fun TopicPartition.commitSync() {
if (autoCommit) return
val offset = consumer.position(this)
log.info("committing offset offset=$offset for partition=$this")
consumer.commitSync(mapOf(this to offsetMetadata(offset)))
}

private fun offsetMetadata(offset: Long): OffsetAndMetadata {
val clientId = consumer.groupMetadata().groupInstanceId().map { "\"$it\"" }.orElse("null")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import java.util.*
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.random.Random
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS

internal class RapidIntegrationTest {
private companion object {
Expand Down Expand Up @@ -186,6 +188,80 @@ internal class RapidIntegrationTest {
assertDoesNotThrow { LocalDateTime.parse(objectMapper.readTree(metadata).path("time").asText()) }
}

@Test
fun `in case of shutdown, the offset committed is the next record to be processed`() = rapidE2E {
ensureRapidIsActive()

// stop rapid so we can queue up records
rapid.stop()
it.cancelAndJoin()

val offsets = (0..100).map {
val key = UUID.randomUUID().toString()
mainTopic.send(key, "{\"test_message_index\": $it}").get()
}
.also {
assertEquals(1, it.distinctBy { it.partition() }.size) { "testen forutsetter én unik partisjon" }
}
.map { it.offset() }

val stopProcessingOnMessage = 5
val expectedOffset = offsets[stopProcessingOnMessage] + 1
val synchronizationBarrier = Channel<Boolean>(RENDEZVOUS)
val rapid = createTestRapid(consumerGroupId, mainTopic, extraTopic)

River(rapid)
.validate { it.requireKey("test_message_index") }
.onSuccess { packet: JsonMessage, _: MessageContext, _, _ ->
val index = packet["test_message_index"].asInt()
println("Read test_message_index=$index")
if (index == stopProcessingOnMessage) {
// notify test that we are ready to go forward
runBlocking { synchronizationBarrier.send(true) }

// wait until test has signalled that shutdown has been started
await("wait until test is ready to go forward")
.atMost(20, SECONDS)
.until { runBlocking { synchronizationBarrier.receive() } }
}
}

try {
runBlocking {
val rapidJob = launch(Dispatchers.IO) { rapid.start() }

await("wait until test is ready to go forward")
.atMost(20, SECONDS)
.until { runBlocking { synchronizationBarrier.receive() } }

rapid.stop()
synchronizationBarrier.send(true)
}
} catch (err: RuntimeException) {
assertEquals("an unexpected error happened", err.message)
} finally {
rapid.stop()
}

await("wait until the rapid stops")
.atMost(20, SECONDS)
.until { !rapid.isRunning() }

val actualOffset = await().atMost(Duration.ofSeconds(5)).until({
val offsets = mainTopic.adminClient
.listConsumerGroupOffsets(consumerGroupId)
?.partitionsToOffsetAndMetadata()
?.get()
?: fail { "was not able to fetch committed offset for consumer $consumerGroupId" }
offsets[TopicPartition(mainTopic.topicnavn, 0)]
}) { it != null }

val metadata = actualOffset?.metadata() ?: fail { "expected metadata to be present in OffsetAndMetadata" }
assertEquals(expectedOffset, actualOffset.offset())
assertTrue(objectMapper.readTree(metadata).has("groupInstanceId"))
assertDoesNotThrow { LocalDateTime.parse(objectMapper.readTree(metadata).path("time").asText()) }
}

private fun TestContext.ensureRapidIsActive() {
val readMessages = mutableListOf<JsonMessage>()
River(rapid).onSuccess { packet: JsonMessage, _: MessageContext, _, _ -> readMessages.add(packet) }
Expand Down Expand Up @@ -318,4 +394,4 @@ private class LocalKafkaConfig(private val connectionProperties: Properties) : C
putAll(connectionProperties)
}
}
}
}

0 comments on commit 17a2772

Please sign in to comment.