From 105481e3d60f4c0e4cba311d52710fbf4c8a6340 Mon Sep 17 00:00:00 2001 From: david steinsland Date: Fri, 29 Nov 2024 15:03:39 +0100 Subject: [PATCH] =?UTF-8?q?stopper=20prosessering=20av=20meldinger=20n?= =?UTF-8?q?=C3=A5r=20kafka-rapid=20skal=20stoppe?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tbd_libs/rapids_and_rivers/KafkaRapid.kt | 11 +++++---- .../rapids_and_rivers/RapidIntegrationTest.kt | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt b/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt index 51a280c..c17fc60 100644 --- a/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt +++ b/rapids-and-rivers/src/main/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/KafkaRapid.kt @@ -108,10 +108,13 @@ class KafkaRapid( .mapValues { it.value.minOf { it.offset() } } .toMutableMap() try { - records.onEach { record -> - onRecord(record) - currentPositions[TopicPartition(record.topic(), record.partition())] = record.offset() + 1 - } + records + .onEach { record -> + if (running.get()) { + onRecord(record) + currentPositions[TopicPartition(record.topic(), record.partition())] = record.offset() + 1 + } + } } catch (err: Exception) { log.info( "due to an error during processing, positions are reset to each next message (after each record that was processed OK):" + diff --git a/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt b/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt index 49fb144..91ca52a 100644 --- a/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt +++ b/rapids-and-rivers/src/test/kotlin/com/github/navikt/tbd_libs/rapids_and_rivers/RapidIntegrationTest.kt @@ -24,6 +24,7 @@ import java.time.Duration import java.time.LocalDateTime import java.util.* import java.util.concurrent.TimeUnit.SECONDS +import java.util.concurrent.atomic.AtomicBoolean import kotlin.random.Random internal class RapidIntegrationTest { @@ -84,6 +85,28 @@ internal class RapidIntegrationTest { assertDoesNotThrow { rapid.stop() } } + @Test + fun `stops while handling messages`() = rapidE2E { + val handlingMessages = AtomicBoolean(false) + rapid.register { message, context, metadata, metrics -> + handlingMessages.set(true) + runBlocking { delay(1000) } + } + repeat(100) { mainTopic.send("{}") } + await() + .atMost(10, SECONDS) + .until { handlingMessages.get() } + + rapid.stop() + + await() + .atMost(10, SECONDS) + .until { + runBlocking { it.join() } + true + } + } + @Test fun `should stop on errors`() { assertThrows {