diff --git a/catalog/catalog-service/src/main/java/org/ecommerce/domain/Outbox.java b/catalog/catalog-service/src/main/java/org/ecommerce/domain/Outbox.java new file mode 100644 index 00000000..b0a29cfd --- /dev/null +++ b/catalog/catalog-service/src/main/java/org/ecommerce/domain/Outbox.java @@ -0,0 +1,68 @@ +package org.ecommerce.domain; + +import jakarta.persistence.*; +import java.util.UUID; + +@Entity +@Table(name = "outbox") +public class Outbox { + + @Id + @GeneratedValue(strategy = GenerationType.AUTO) + private UUID id; + + @Column(nullable = false) + private String message; + + @Column(nullable = false) + private String status; + + @Column(nullable = false) + private String createdAt; + + @Column + private String sentAt; + + public Outbox() { + } + + public UUID getId() { + return id; + } + + public void setId(UUID id) { + this.id = id; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(String createdAt) { + this.createdAt = createdAt; + } + + public String getSentAt() { + return sentAt; + } + + public void setSentAt(String sentAt) { + this.sentAt = sentAt; + } +} diff --git a/catalog/catalog-service/src/main/java/org/ecommerce/domain/events/Event.java b/catalog/catalog-service/src/main/java/org/ecommerce/domain/events/Event.java index 43dbd1bc..00f87088 100644 --- a/catalog/catalog-service/src/main/java/org/ecommerce/domain/events/Event.java +++ b/catalog/catalog-service/src/main/java/org/ecommerce/domain/events/Event.java @@ -1,19 +1,18 @@ package org.ecommerce.domain.events; + import java.time.LocalDateTime; import java.util.UUID; - import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; - import lombok.Data; import lombok.ToString; @JsonTypeInfo( - use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.EXISTING_PROPERTY, - property = "eventType", - visible = true + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.EXISTING_PROPERTY, + property = "eventType", + visible = true ) @JsonSubTypes({ @JsonSubTypes.Type(value = ProductListed.class, name = "ProductListed"), @@ -29,12 +28,19 @@ public abstract class Event { protected String aggregateType; protected String aggregateId; protected LocalDateTime createdAt = LocalDateTime.now(); - public Event(){}; + + public Event() {} + public Event(String eventType, String aggregateType, String aggregateId) { this.eventType = eventType; this.aggregateType = aggregateType; this.aggregateId = aggregateId; } + + public UUID getEventId() { + return eventId; + } + @Override public boolean equals(Object obj) { if (this == obj) @@ -51,6 +57,7 @@ public boolean equals(Object obj) { return false; return true; } + @Override public int hashCode() { final int prime = 31; @@ -58,7 +65,4 @@ public int hashCode() { result = prime * result + ((eventId == null) ? 0 : eventId.hashCode()); return result; } - - - -} \ No newline at end of file +} diff --git a/catalog/catalog-service/src/main/java/org/ecommerce/repository/OutboxRepository.java b/catalog/catalog-service/src/main/java/org/ecommerce/repository/OutboxRepository.java new file mode 100644 index 00000000..110d9031 --- /dev/null +++ b/catalog/catalog-service/src/main/java/org/ecommerce/repository/OutboxRepository.java @@ -0,0 +1,69 @@ +package org.ecommerce.repository; + +import java.util.List; +import java.util.UUID; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.EntityExistsException; +import jakarta.persistence.TypedQuery; +import jakarta.transaction.Transactional; + +import org.ecommerce.domain.Outbox; +import org.ecommerce.exceptions.EntityNotFoundException; +import org.ecommerce.exceptions.EntityAlreadyExistsException; + +@ApplicationScoped +public class OutboxRepository { + + @Inject + EntityManager em; + + public List findAll() { + return em.createQuery("from Outbox", Outbox.class) + .getResultList(); + } + + public List findPendingMessages() { + TypedQuery query = em.createQuery( + "SELECT o FROM Outbox o WHERE o.status = 'PENDING'", Outbox.class); + return query.getResultList(); + } + + public Outbox findById(UUID id) throws EntityNotFoundException { + Outbox outbox = em.find(Outbox.class, id); + if (outbox != null) { + return outbox; + } + throw new EntityNotFoundException("Cannot find outbox entry with ID: " + id); + } + + @Transactional + public Outbox insert(Outbox outbox) throws EntityAlreadyExistsException { + try { + outbox.setId(UUID.randomUUID()); + em.persist(outbox); + return outbox; + } catch (EntityExistsException e) { + throw new EntityAlreadyExistsException("Outbox entry already exists"); + } + } + + @Transactional + public Outbox update(Outbox outbox) throws EntityNotFoundException { + try { + return em.merge(outbox); + } catch (IllegalArgumentException e) { + throw new EntityNotFoundException("Cannot find outbox entry with ID: " + outbox.getId()); + } + } + + @Transactional + public void delete(UUID id) { + Outbox outbox = em.find(Outbox.class, id); + if (outbox != null) { + em.remove(outbox); + } + } +} diff --git a/catalog/catalog-service/src/main/java/org/ecommerce/service/OutboxProcessor.java b/catalog/catalog-service/src/main/java/org/ecommerce/service/OutboxProcessor.java new file mode 100644 index 00000000..af65c9bf --- /dev/null +++ b/catalog/catalog-service/src/main/java/org/ecommerce/service/OutboxProcessor.java @@ -0,0 +1,45 @@ +package org.ecommerce.service; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.ecommerce.domain.Outbox; +import org.ecommerce.service.OutboxService; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.jboss.logging.Logger; +import io.quarkus.scheduler.Scheduled; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.util.List; + +@ApplicationScoped +public class OutboxProcessor { + + private static final Logger logger = Logger.getLogger(OutboxProcessor.class); + + @Inject + OutboxService outboxService; + + @Inject + @Channel("products-out") + Emitter productsEmitter; + + @Scheduled(every = "5s") + public void processOutbox() { + List messages = outboxService.getPendingMessages(); + + for (Outbox message : messages) { + try { + productsEmitter.send(message.getMessage()).thenRun(() -> { + outboxService.markAsSent(message.getId()); + logger.info("Successfully sent outbox message with ID: " + message.getId()); + }).exceptionally(e -> { + logger.error("Failed to send outbox message with ID " + message.getId() + ": " + e.getMessage()); + outboxService.markAsFailed(message.getId()); + return null; + }); + } catch (Exception e) { + logger.error("Error processing outbox message with ID " + message.getId() + ": " + e.getMessage()); + } + } + } +} diff --git a/catalog/catalog-service/src/main/java/org/ecommerce/service/OutboxService.java b/catalog/catalog-service/src/main/java/org/ecommerce/service/OutboxService.java new file mode 100644 index 00000000..a477fd93 --- /dev/null +++ b/catalog/catalog-service/src/main/java/org/ecommerce/service/OutboxService.java @@ -0,0 +1,62 @@ +package org.ecommerce.service; + +import jakarta.enterprise.context.ApplicationScoped; +import org.ecommerce.domain.Outbox; +import org.ecommerce.repository.OutboxRepository; +import org.ecommerce.exceptions.EntityAlreadyExistsException; +import org.ecommerce.exceptions.EntityNotFoundException; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; + +import java.time.Instant; +import java.util.List; +import java.util.UUID; + +@ApplicationScoped +public class OutboxService { + + @Inject + OutboxRepository outboxRepository; + + @Transactional + public Outbox createOutboxMessage(String message) { + Outbox outbox = new Outbox(); + outbox.setMessage(message); + outbox.setStatus("PENDING"); + outbox.setCreatedAt(Instant.now().toString()); + try { + return outboxRepository.insert(outbox); + } catch (EntityAlreadyExistsException e) { + System.err.println(e.getMessage()); + return null; + } + } + + @Transactional + public void markAsSent(UUID id) { + try { + Outbox outbox = outboxRepository.findById(id); + outbox.setStatus("SENT"); + outbox.setSentAt(Instant.now().toString()); + outboxRepository.update(outbox); + } catch (EntityNotFoundException e) { + System.err.println(e.getMessage()); + } + } + + @Transactional + public void markAsFailed(UUID id) { + try { + Outbox outbox = outboxRepository.findById(id); + outbox.setStatus("FAILED"); + outboxRepository.update(outbox); + } catch (EntityNotFoundException e) { + System.err.println(e.getMessage()); + } + } + + @Transactional + public List getPendingMessages() { + return outboxRepository.findPendingMessages(); + } +} diff --git a/catalog/catalog-service/src/main/java/org/ecommerce/service/ProductService.java b/catalog/catalog-service/src/main/java/org/ecommerce/service/ProductService.java index cbf842e5..6b3fb6cf 100644 --- a/catalog/catalog-service/src/main/java/org/ecommerce/service/ProductService.java +++ b/catalog/catalog-service/src/main/java/org/ecommerce/service/ProductService.java @@ -15,15 +15,15 @@ import org.ecommerce.exceptions.EntityAlreadyExistsException; import org.ecommerce.exceptions.EntityNotFoundException; import org.ecommerce.repository.ProductRepository; +import org.ecommerce.domain.Outbox; + -//import io.quarkus.scheduler.Scheduled; -//import io.quarkus.scheduler.Scheduled.ConcurrentExecution; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.transaction.Transactional; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + @ApplicationScoped public class ProductService { @@ -36,10 +36,12 @@ public class ProductService { @Inject @Channel("products-out") Emitter productsEmitter; - private final Logger logger= LoggerFactory.getLogger(ProductService.class); + @Inject + OutboxService outboxService; + private final Logger logger = LoggerFactory.getLogger(ProductService.class); - public List findByRange(int page , int maxResults) { - return productRepo.findByRange(page , maxResults); + public List findByRange(int page, int maxResults) { + return productRepo.findByRange(page, maxResults); } public List findAll() { @@ -51,57 +53,47 @@ public Product getProductDetails(UUID id) throws EntityNotFoundException { } public Product add(Product product, String categoryName) throws EntityAlreadyExistsException, EntityNotFoundException { - product.setId(UUID.randomUUID()); - //waiting for pricing - //product.setBasePrice(pricingService.getProductPrice(product.getId())); + product.setId(UUID.randomUUID()); BigDecimal temp = new BigDecimal("10506"); product.setBasePrice(temp); product.setShownPrice(product.getBasePrice()); - /// temporary for testing - + + ProductCategory category = categoryService.getCategoryByName(categoryName); product.setCategory(category); ProductListed productListed = new ProductListed(product); + + + Outbox outboxMessage = outboxService.createOutboxMessage(productListed.toString()); + + if (outboxMessage == null) { + logger.error("Failed to create outbox message for product: " + product.getProductName()); + throw new RuntimeException("Failed to create outbox message"); // Handle accordingly + } + + try { CompletionStage ack = productsEmitter.send(productListed); ack.thenAccept(result -> { - logger.info("Product listed and sent via Kafka "+ack ); + logger.info("Product listed and sent via Kafka: " + productListed); + outboxService.markAsSent(outboxMessage.getId()); }).exceptionally(error -> { - logger.error("Error when sending the productlisted event"); + logger.error("Error when sending the product listed event: " + error.getMessage()); + outboxService.markAsFailed(outboxMessage.getId()); return null; }); } catch (Exception e) { - logger.error("Error when Serializing JSON "); + logger.error("Error when serializing JSON: " + e.getMessage()); + outboxService.markAsFailed(outboxMessage.getId()); } - System.out.println(productListed); + + return productRepo.insert(product); } - // @Scheduled(every = "12h", concurrentExecution = ConcurrentExecution.SKIP) - // public void checkPriceUpdates(){ - // int page = 0; - // int maxResults = 10; // Adjust the range as needed - // List products; - // do { - // products = findByRange(page, maxResults); - // products.forEach(product -> { - // BigDecimal newPrice = pricingService.getProductPrice(product.getId()); - // if (product.getShownPrice() != newPrice) { - // product.setShownPrice(newPrice); - // try { - // productRepo.update(product); - // } catch (EntityNotFoundException e) { - // e.printStackTrace(); - // } - // } - // }); - // page++; - // } while (!products.isEmpty()); - // } - public Product updateProduct(Product product) throws EntityNotFoundException { - return productRepo.update(product); + return productRepo.update(product); } public void removeProduct(UUID id) { diff --git a/catalog/catalog-service/src/main/resources/application.properties b/catalog/catalog-service/src/main/resources/application.properties index 51f9acce..6f3fbb5e 100644 --- a/catalog/catalog-service/src/main/resources/application.properties +++ b/catalog/catalog-service/src/main/resources/application.properties @@ -34,5 +34,8 @@ mp.messaging.incoming.products-in.topic=products-listed # Kafka configuration for incoming messages (Consumer for testing) # mp.messaging.incoming.products-in.connector=smallrye-kafka # mp.messaging.incoming.products-in.topic=products-created -# mp.messaging.incoming.products-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -# mp.messaging.incoming.products-in.group.id=products-consumer-group + mp.messaging.incoming.products-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + mp.messaging.incoming.products-in.group.id=products-consumer-group + + + diff --git a/docker-compose.yml b/docker-compose.yml index 7d11c612..b486a5e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,29 +73,24 @@ services: POSTGRES_PASSWORD: azerty ports: - 5441:5432 -# mongo_example: -# image: mongo:4.4 -# environment: -# MONGO_INITDB_ROOT_USERNAME: root -# MONGO_INITDB_ROOT_PASSWORD: azerty -# ports: -# - 27017:27017 + + # mongo_example: + # image: mongo:4.4 + # environment: + # MONGO_INITDB_ROOT_USERNAME: root + # MONGO_INITDB_ROOT_PASSWORD: azerty + # ports: + # - 27017:27017 zookeeper: image: strimzi/kafka:0.19.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/zookeeper-server-start.sh config/zookeeper.properties" - ] + command: [ "sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties" ] ports: - "2181:2181" environment: LOG_DIR: /tmp/logs kafka: image: strimzi/kafka:0.19.0-kafka-2.5.0 - command: [ - "sh", "-c", - "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" - ] + command: [ "sh", "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" ] depends_on: - zookeeper ports: @@ -105,6 +100,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + # iam: # image: quay.io/keycloak/keycloak:20.0.2 # environment: @@ -127,4 +123,4 @@ services: # - "-c" # - "precreate-core products-index; exec solr -f" # volumes: -# data: \ No newline at end of file +# data: