Event-driven architecture is a software architecture paradigm promoting the production, detection, consumption of, and reaction to events. It was created to help developers have a decoupled and responsive application. Because of this, it has been widely used in applications that have been broken down from monoliths to microservices.
Spring Cloud Stream improves your productivity when working with Apache Kafka, RabbitMQ, Azure Event Hub, and more, providing three key abstractions to simplify your code.
Spring Cloud Function enables you to write functions once and run them anywhere (AWS, Azure, etc.), while continuing to use all the familiar and comprehensive Spring APIs. You can chain multiple functions together to create new capabilities
The main goal of this repository is to demonstrate how to create an event driven system which is simplified by using Spring Cloud Stream/Function. In particular, we will be using Spring Cloud Stream for Apache Kafka Binder . Please visit my previous tutorial to learn the basic of producer-consumer using Apache Kafka with Spring.
Note: We will be using Kafdrop which is a Web UI for viewing Kafka topics and browsing consumer groups. The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.
- Make sure to install Docker on your machine
- Go to the root directory of the project where docker-compose.yml is located.
- Run the docker compose by
docker compose -f kafka-docker-compose.yml up -d
Note: Make sure no errors are present on the logs such connection refused etc. Now go to your browser and access http://localhost:9000/ to see an overview of your Kafka cluster, brokers and topics
To use Apache Kafka binder , we need to add below dependency to our application:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.
If you are developing on your local setup, there will be no additional steps to setup the Kafka Configuration Options as long as your Apache Kafka properties are the defaults used by Spring Cloud Streams for Apache Kafka Binder.
Note: The docker-compose.yml ensures that the default configurations used by Spring Cloud Stream for Apache Kafka Binder are already configured such as port 9092 etc.
Our main class exposes three bean definitions:
@SpringBootApplication
@Log4j2
public class SpringcloudfuncApplication {
public static void main(String[] args) {
SpringApplication.run(SpringcloudfuncApplication.class, args);
}
}
@Configuration
@RequiredArgsConstructor
class StockEventConsumers {
@Bean
public Consumer<Message<StockEvent>> newStock() {
return (o) -> {
var event = o.getPayload();
log.info("Received Stock ====> {} , PRICE ====> {}", event.getStock(), event.getPrice());
};
}
}
Hence, if we run the application - Spring must be able to identify what functions we intend to run and how do we want them to be composed of. For this tutorial, we will have the following configuration on our application.yml file:
spring:
cloud:
stream:
kafka:
function:
definition: newStock;
bindings:
newStock-in-0:
destination: new-stock-topic
new-stock-out-0:
destination: new-stock-topic
Based from this, we have two callable functions:
- The newStock function which accepts a String message from Kafka topic newStock-in-0 and outputs the result to Kafka topic new-stock-out-0.
Note: The Kafka topics I have mentioned above will all be generated automatically by Spring during startup. It uses the function name with in-out tagging during discovery.
- Download STS version 3.4.* (or better) from the Spring website. STS is a free Eclipse bundle with many features useful for Spring developers.
- Right-click on the project, or the main application class then select "Run As" > "Spring Boot App"
We will now test our application by attempting to produce messages on each topic and check the results via Kafdrop Web or via server console (since we enabled logging).
Note: We will use the docker interactive shell by connecting to the container running the Kafka image.
- Run docker ps command and take note of the container ID running the Apache Kafka image
- Run below command to connect to docker interactive shell.
winpty docker exec -it kafka-broker bash
I have used winpty above since my local setup is running on Windows. If you are running on Linux or MacOS then remove the winpty and just run below command:
docker exec -it kafka-broker bash
- Create/produce a JSON message to the target topic newStock-in-0
kafka-console-producer --broker-list localhost:19092 --topic new-stock-topic --property value.serializer=custom.class.serialization.JsonSe
rializer
- On the line > Enter your desired JSON message (e.g., { "stock": "ZOOM", "price": 12000} )
- Check your console, one should see something like this:
2021-06-15 01:01:37.176 INFO 10820 --- [container-0-C-1] c.boutouil.consumer.StockEventConsumers : Received Stock ====> ZOOM , PRICE ====> 12000.0
- Or you can go to Kafdrop URL (http://localhost:9000/) and click on the topic new-stock-topic > __ View Messages__ then on the filter section click View Messages. One should see something similar to below:
Offset: 203 Key: empty Timestamp: 2021-06-15 00:01:36.170 Headers: empty
{
"stock": "ZOOM",
"price": 12000
}
Contact me at Mohammed Amine BOUTOUIL
Happy coding!!!