Skip to content

Commit

Permalink
fixed serialization of subtype
Browse files Browse the repository at this point in the history
  • Loading branch information
DaoudAA committed Dec 4, 2024
1 parent 4bdce03 commit c57d999
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package org.ecommerce.domain.events;

import java.math.BigDecimal;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import org.ecommerce.domain.Product;


@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class ProductListed extends Event {
private final String productName;
private final String categoryName;
private final String description;
private final BigDecimal price;
private String productName;
private String categoryName;
private String description;
private BigDecimal price;

public ProductListed(Product product) {
super("productListed", "Product", product.getId().toString());
super("ProductListed", "Product", product.getId().toString());
this.productName = product.getProductName();
this.categoryName = product.getCategory().getCategoryName(); // Assuming `getName()` returns the category name
this.description = product.getDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.math.BigDecimal;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
Expand All @@ -19,7 +20,10 @@
//import io.quarkus.scheduler.Scheduled.ConcurrentExecution;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ApplicationScoped
public class ProductService {

Expand All @@ -32,6 +36,7 @@ public class ProductService {
@Inject
@Channel("products-out")
Emitter<Event> productsEmitter;
private final Logger logger= LoggerFactory.getLogger(ProductService.class);

public List<Product> findByRange(int page , int maxResults) {
return productRepo.findByRange(page , maxResults);
Expand All @@ -58,7 +63,17 @@ public Product add(Product product, String categoryName) throws EntityAlreadyExi
ProductCategory category = categoryService.getCategoryByName(categoryName);
product.setCategory(category);
ProductListed productListed = new ProductListed(product);
productsEmitter.send(productListed);
try {
CompletionStage<Void> ack = productsEmitter.send(productListed);
ack.thenAccept(result -> {
logger.info("Product listed and sent via Kafka "+ack );
}).exceptionally(error -> {
logger.error("Error when sending the productlisted event");
return null;
});
} catch (Exception e) {
logger.error("Error when Serializing JSON ");
}
System.out.println(productListed);
return productRepo.insert(product);
}
Expand Down
17 changes: 10 additions & 7 deletions catalog/catalog-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ quarkus.datasource.username=postgres
quarkus.datasource.password=azerty
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5434/db_catalog
# Drop and create the database at startup (use `update` to only update the schema)
quarkus.hibernate-orm.database.generation=drop-and-create
quarkus.hibernate-orm.database.generation=update

%dev.quarkus.hibernate-orm.log.sql=true
%dev.quarkus.hibernate-orm.log.format-sql=true
Expand All @@ -25,11 +25,14 @@ quarkus.http.access-log.enabled=true

# Kafka configuration for outgoing messages (Producer)
mp.messaging.outgoing.products-out.connector=smallrye-kafka
mp.messaging.outgoing.products-out.topic=products-created
mp.messaging.outgoing.products-out.topic=products-updated
mp.messaging.outgoing.products-out.topic=products-listed
#mp.messaging.outgoing.products-out.topic=products-updated
#testing
mp.messaging.incoming.products-in.connector=smallrye-kafka
mp.messaging.incoming.products-in.topic=products-listed

# Kafka configuration for incoming messages (Consumer for testing)
mp.messaging.incoming.products-in.connector=smallrye-kafka
mp.messaging.incoming.products-in.topic=products-created
mp.messaging.incoming.products-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.products-in.group.id=products-consumer-group
# mp.messaging.incoming.products-in.connector=smallrye-kafka
# mp.messaging.incoming.products-in.topic=products-created
# mp.messaging.incoming.products-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# mp.messaging.incoming.products-in.group.id=products-consumer-group

0 comments on commit c57d999

Please sign in to comment.