Skip to content

Commit

Permalink
Add Kafka module
Browse files Browse the repository at this point in the history
  • Loading branch information
Silvio D'Alessandro committed Aug 25, 2024
1 parent 96a1a9c commit fe21e86
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 4 deletions.
26 changes: 22 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ jobs:
needs:
- basics
- caching
- http-server
- graphql
- http-server
- kafka
- rabbitmq
- data-jpa
- security
Expand Down Expand Up @@ -57,6 +58,23 @@ jobs:
with:
name: test-results_caching
path: "**/build/reports/tests"
graphql:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 17
- name: Build
run: ./gradlew :examples:graphql:build
- name: Upload Test Results
uses: actions/upload-artifact@v3
if: always()
with:
name: test-results_graphql
path: "**/build/reports/tests"
http-server:
runs-on: ubuntu-latest
steps:
Expand All @@ -74,7 +92,7 @@ jobs:
with:
name: test-results_http-server
path: "**/build/reports/tests"
graphql:
kafka:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand All @@ -84,12 +102,12 @@ jobs:
distribution: temurin
java-version: 17
- name: Build
run: ./gradlew :examples:graphql:build
run: ./gradlew :examples:kafka:build
- name: Upload Test Results
uses: actions/upload-artifact@v3
if: always()
with:
name: test-results_graphql
name: test-results_kafka
path: "**/build/reports/tests"
rabbitmq:
runs-on: ubuntu-latest
Expand Down
12 changes: 12 additions & 0 deletions examples/kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
plugins {
id("cnt-micronaut.micronaut-conventions")
id("io.micronaut.test-resources")
}

dependencies {
implementation("io.micronaut.kafka:micronaut-kafka")
}

application {
mainClass.set("example.micronaut.ApplicationKt")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package example.micronaut.kafka

import io.micronaut.runtime.Micronaut


@Suppress("SpreadOperator")
fun main(args: Array<String>) {
Micronaut.run(*args)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package example.micronaut.kafka.business

import java.util.UUID

sealed class BookEvent {
abstract val type: String
abstract val eventId: UUID
abstract val bookId: UUID
}

data class BookCreatedEvent(
override val eventId: UUID,
override val bookId: UUID,
val book: Book
) : BookEvent() {
override val type: String = "book-created"
}

data class BookDeletedEvent(
override val eventId: UUID,
override val bookId: UUID
) : BookEvent() {
override val type: String = "book-deleted"
}

fun BookRecord.createdEvent() =
BookCreatedEvent(
eventId = UUID.randomUUID(),
bookId = this.id,
book = this.book
)

fun BookRecord.deletedEvent() =
BookDeletedEvent(
eventId = UUID.randomUUID(),
bookId = this.id
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package example.micronaut.kafka.business

import java.util.UUID

data class Book(
val isbn: String,
val title: String
)

data class BookRecord(
val id: UUID,
val book: Book
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package example.micronaut.kafka.events

import example.micronaut.kafka.business.BookCreatedEvent
import example.micronaut.kafka.business.BookDeletedEvent
import example.micronaut.kafka.business.BookEvent
import io.micronaut.configuration.kafka.annotation.ErrorStrategy
import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.configuration.kafka.exceptions.DefaultKafkaListenerExceptionHandler
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler
import io.micronaut.context.annotation.Replaces
import jakarta.inject.Singleton
import org.slf4j.LoggerFactory.getLogger

@KafkaListener(
offsetReset = EARLIEST,
errorStrategy = ErrorStrategy(
value = ErrorStrategyValue.RESUME_AT_NEXT_RECORD,
)
)
open class BookEventHandler {

private val log = getLogger(javaClass)

@Topic("book-created")
open fun handleCreatedEvent(event: BookCreatedEvent) {
log.info("”Book was created: $event")
}

@Topic("book-deleted")
open fun handleDeletedEvent(event: BookDeletedEvent) {
log.info("Book was deleted: $event")
}
}

@Singleton
@Replaces(DefaultKafkaListenerExceptionHandler::class)
class KafkaListenerExceptionHandlerFactory(
private val publishEvent: PublishEventFunction
) : KafkaListenerExceptionHandler {

private val log = getLogger(javaClass)

override fun handle(exception: KafkaListenerException?) {
log.error("Failed to publish record due to: {}", exception?.message, exception)

exception?.consumerRecord?.ifPresent { record ->
when (val event = record.value()) {
is BookEvent -> publishEvent.bookDeadLetter(event)
else -> log.info("Cannot process events that are not of type ${BookEvent::class.simpleName}.")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package example.micronaut.kafka.events

import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory.getLogger

@KafkaListener
open class DeadLetterHandler {

private val log = getLogger(javaClass)

@Topic("book-dead-letter")
open fun handleDeadLetterEvent(record: ConsumerRecord<String, *>) {
log.error("Dead letter message arrived: ${record.value()}")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package example.micronaut.kafka.events

import example.micronaut.kafka.business.BookCreatedEvent
import example.micronaut.kafka.business.BookDeletedEvent
import example.micronaut.kafka.business.BookEvent
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic

@KafkaClient
interface PublishEventFunction {

@Topic("book-created")
operator fun invoke(event: BookCreatedEvent)

@Topic("book-deleted")
operator fun invoke(event: BookDeletedEvent)

@Topic("book-dead-letter")
fun bookDeadLetter(event: BookEvent)
}
1 change: 1 addition & 0 deletions examples/kafka/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

10 changes: 10 additions & 0 deletions examples/kafka/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%cyan(%d{HH:mm:ss.SSS}) %gray([%thread]) %highlight(%-5level) %magenta(%logger{36}) - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package example.micronaut.kafka

import example.micronaut.kafka.business.createdEvent
import example.micronaut.kafka.business.deletedEvent
import example.micronaut.kafka.events.DeadLetterHandler
import example.micronaut.kafka.events.BookEventHandler
import example.micronaut.kafka.events.PublishEventFunction
import example.spring.boot.kafka.business.Examples.cleanCode
import io.micronaut.test.annotation.MockBean
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import io.mockk.verify
import org.junit.jupiter.api.Test

@MicronautTest
class MessagingIntegrationTests(
private val publishEvent: PublishEventFunction,
private val bookEventHandler: BookEventHandler,
private val deadLetterHandler: DeadLetterHandler
) {

@MockBean(BookEventHandler::class)
fun mockEventHandler() = mockk<BookEventHandler>(relaxed = true)

@MockBean(DeadLetterHandler::class)
fun mockDeadLetterHandler() = spyk<DeadLetterHandler>()

@Test
fun `handles BookCreatedEvent`() {
val event = cleanCode.createdEvent()

publishEvent(event)

verify(timeout = 1_000) { bookEventHandler.handleCreatedEvent(event) }
}

@Test
fun `handles BookDeletedEvent`() {
val event = cleanCode.deletedEvent()

publishEvent(event)

verify(timeout = 1_000) { bookEventHandler.handleDeletedEvent(event) }
}

@Test
fun `puts failure messages to dead letter queue`(){
every { bookEventHandler.handleCreatedEvent(any()) } throws IllegalArgumentException("upsi")
val event = cleanCode.createdEvent() // used as representation

publishEvent(event)

verify(timeout = 3_000) { deadLetterHandler.handleDeadLetterEvent(any()) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package example.spring.boot.kafka.business

import example.micronaut.kafka.business.Book
import example.micronaut.kafka.business.BookRecord
import java.util.UUID

object Examples {

val cleanCode = BookRecord(
id = UUID.fromString("b3fc0be8-463e-4875-9629-67921a1e00f4"),
book = Book(
isbn = "9780132350884",
title = "Clean Code"
)
)

}
5 changes: 5 additions & 0 deletions examples/kafka/src/test/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
test-resources:
containers:
kafka:
image-name: confluentinc/cp-kafka:7.0.4

1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ include("examples:basics")
include("examples:http-server")
include("examples:caching")
include("examples:graphql")
include("examples:kafka")
include("examples:rabbitmq")
include("examples:data-jpa")
include("examples:security")

0 comments on commit fe21e86

Please sign in to comment.