Skip to content

Commit

Permalink
fixed Update Product
Browse files Browse the repository at this point in the history
  • Loading branch information
DaoudAA committed Dec 10, 2024
1 parent bec8665 commit f58bec9
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.ecommerce.dto;
package org.ecommerce.domain.events;


public class InventoryEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.ecommerce.domain.Product;
import org.ecommerce.domain.ProductCategory;
import org.ecommerce.domain.events.InventoryEvent;
import org.ecommerce.domain.events.ProductAvailabilityEvent;
import org.ecommerce.dto.InventoryEvent;
import org.ecommerce.exceptions.EntityAlreadyExistsException;
import org.ecommerce.exceptions.EntityNotFoundException;
import org.ecommerce.repository.ProductRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.ecommerce.domain.Outbox;
import org.ecommerce.service.OutboxService;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import io.quarkus.scheduler.Scheduled;
Expand All @@ -23,7 +22,7 @@ public class OutboxProcessor {
@Channel("products-out")
Emitter<String> productsEmitter;

@Scheduled(every = "5s")
@Scheduled(every = "60s")
public void processOutbox() {
List<Outbox> messages = outboxService.getPendingMessages();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.ecommerce.domain.ProductCategory;
import org.ecommerce.domain.events.Event;
import org.ecommerce.domain.events.ProductListed;
import org.ecommerce.domain.events.ProductUpdated;
import org.ecommerce.exceptions.EntityAlreadyExistsException;
import org.ecommerce.exceptions.EntityNotFoundException;
import org.ecommerce.repository.ProductRepository;
Expand All @@ -20,7 +21,6 @@

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,7 +35,9 @@ public class ProductService {
PricingService pricingService;
@Inject
@Channel("products-out")
Emitter<Event> productsEmitter;
Emitter<Event> productsListedEmitter;
@Channel("products-updated")
Emitter<Event> productsUpdatedEmitter;
@Inject
OutboxService outboxService;
private final Logger logger = LoggerFactory.getLogger(ProductService.class);
Expand Down Expand Up @@ -65,7 +67,7 @@ public Product add(Product product, String categoryName) throws EntityAlreadyExi
ProductListed productListed = new ProductListed(product);


Outbox outboxMessage = outboxService.createOutboxMessage(productListed.toString());
Outbox outboxMessage = outboxService.createOutboxMessage(productListed.getEventId().toString());

if (outboxMessage == null) {
logger.error("Failed to create outbox message for product: " + product.getProductName());
Expand All @@ -74,7 +76,7 @@ public Product add(Product product, String categoryName) throws EntityAlreadyExi


try {
CompletionStage<Void> ack = productsEmitter.send(productListed);
CompletionStage<Void> ack = productsListedEmitter.send(productListed);
ack.thenAccept(result -> {
logger.info("Product listed and sent via Kafka: " + productListed);
outboxService.markAsSent(outboxMessage.getId());
Expand All @@ -93,7 +95,46 @@ public Product add(Product product, String categoryName) throws EntityAlreadyExi
}

public Product updateProduct(Product product) throws EntityNotFoundException {
return productRepo.update(product);
Product existingProduct = productRepo.findById(product.getId());

if (product.getCategory() != null) {
ProductCategory newCategory = categoryService.getCategoryByName(product.getCategory().getCategoryName());
if (newCategory != null) {
existingProduct.setCategory(newCategory);
} else {
throw new EntityNotFoundException("Category not found: " + product.getCategory().getCategoryName());
}
}
if (product.getProductName() != null) {
existingProduct.setProductName(product.getProductName());
}
if (product.getDescription() != null) {
existingProduct.setDescription(product.getDescription());
}
if (product.getShownPrice() != null) {
existingProduct.setShownPrice(product.getShownPrice());
}
existingProduct.setDisponibility(product.isDisponibility());

ProductUpdated productUpdated = new ProductUpdated(existingProduct);
Outbox outboxMessage = outboxService.createOutboxMessage(productUpdated.getEventId().toString());

try {
CompletionStage<Void> ack = productsUpdatedEmitter.send(productUpdated);
ack.thenAccept(result -> {
logger.info("Product listed and sent via Kafka: " + productUpdated);
outboxService.markAsSent(outboxMessage.getId());
}).exceptionally(error -> {
logger.error("Error when sending the product listed event: " + error.getMessage());
outboxService.markAsFailed(outboxMessage.getId());
return null;
});
} catch (Exception e) {
logger.error("Error when serializing JSON: " + e.getMessage());
outboxService.markAsFailed(outboxMessage.getId());
}

return productRepo.update(existingProduct);
}

public void removeProduct(UUID id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ quarkus.http.access-log.enabled=true
# Kafka configuration for outgoing messages (Producer)
mp.messaging.outgoing.products-out.connector=smallrye-kafka
mp.messaging.outgoing.products-out.topic=products-listed
mp.messaging.outgoing.products-updated.connector=smallrye-kafka
mp.messaging.outgoing.products-updated.topic=products-updated

#mp.messaging.outgoing.products-out.topic=products-updated
#testing
mp.messaging.incoming.products-in.connector=smallrye-kafka
Expand All @@ -34,8 +37,8 @@ mp.messaging.incoming.products-in.topic=products-listed
# Kafka configuration for incoming messages (Consumer for testing)
# mp.messaging.incoming.products-in.connector=smallrye-kafka
# mp.messaging.incoming.products-in.topic=products-created
mp.messaging.incoming.products-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.products-in.group.id=products-consumer-group
# mp.messaging.incoming.products-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# mp.messaging.incoming.products-in.group.id=products-consumer-group



0 comments on commit f58bec9

Please sign in to comment.