Skip to content

Commit

Permalink
stopper prosessering av meldinger når kafka-rapid skal stoppe
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsteinsland committed Nov 29, 2024
1 parent cba36a8 commit 105481e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ class KafkaRapid(
.mapValues { it.value.minOf { it.offset() } }
.toMutableMap()
try {
records.onEach { record ->
onRecord(record)
currentPositions[TopicPartition(record.topic(), record.partition())] = record.offset() + 1
}
records
.onEach { record ->
if (running.get()) {
onRecord(record)
currentPositions[TopicPartition(record.topic(), record.partition())] = record.offset() + 1
}
}
} catch (err: Exception) {
log.info(
"due to an error during processing, positions are reset to each next message (after each record that was processed OK):" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.time.Duration
import java.time.LocalDateTime
import java.util.*
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.random.Random

internal class RapidIntegrationTest {
Expand Down Expand Up @@ -84,6 +85,28 @@ internal class RapidIntegrationTest {
assertDoesNotThrow { rapid.stop() }
}

@Test
fun `stops while handling messages`() = rapidE2E {
val handlingMessages = AtomicBoolean(false)
rapid.register { message, context, metadata, metrics ->
handlingMessages.set(true)
runBlocking { delay(1000) }
}
repeat(100) { mainTopic.send("{}") }
await()
.atMost(10, SECONDS)
.until { handlingMessages.get() }

rapid.stop()

await()
.atMost(10, SECONDS)
.until {
runBlocking { it.join() }
true
}
}

@Test
fun `should stop on errors`() {
assertThrows<RuntimeException> {
Expand Down

0 comments on commit 105481e

Please sign in to comment.