Skip to content

Commit

Permalink
Implemented outbox pattern for products out
Browse files Browse the repository at this point in the history
  • Loading branch information
mahdiJ2001 committed Dec 6, 2024
1 parent 6ba1c59 commit de8b50a
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.ecommerce.domain;

import jakarta.persistence.*;
import java.util.UUID;

@Entity
@Table(name = "outbox")
public class Outbox {

@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private UUID id;

@Column(nullable = false)
private String message;

@Column(nullable = false)
private String status;

@Column(nullable = false)
private String createdAt;

@Column
private String sentAt;

public Outbox() {
}

public UUID getId() {
return id;
}

public void setId(UUID id) {
this.id = id;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public String getCreatedAt() {
return createdAt;
}

public void setCreatedAt(String createdAt) {
this.createdAt = createdAt;
}

public String getSentAt() {
return sentAt;
}

public void setSentAt(String sentAt) {
this.sentAt = sentAt;
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package org.ecommerce.domain.events;

import java.time.LocalDateTime;
import java.util.UUID;


import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import lombok.Data;
import lombok.ToString;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "eventType",
visible = true
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "eventType",
visible = true
)
@JsonSubTypes({
@JsonSubTypes.Type(value = ProductListed.class, name = "ProductListed")
@JsonSubTypes.Type(value = ProductListed.class, name = "ProductListed")
})

@Data
Expand All @@ -27,12 +26,19 @@ public abstract class Event {
protected String aggregateType;
protected String aggregateId;
protected LocalDateTime createdAt = LocalDateTime.now();
public Event(){};

public Event() {}

public Event(String eventType, String aggregateType, String aggregateId) {
this.eventType = eventType;
this.aggregateType = aggregateType;
this.aggregateId = aggregateId;
}

public UUID getEventId() {
return eventId;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
Expand All @@ -49,14 +55,12 @@ public boolean equals(Object obj) {
return false;
return true;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((eventId == null) ? 0 : eventId.hashCode());
return result;
}



}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.ecommerce.repository;

import java.util.List;
import java.util.UUID;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityExistsException;
import jakarta.persistence.TypedQuery;
import jakarta.transaction.Transactional;

import org.ecommerce.domain.Outbox;
import org.ecommerce.exceptions.EntityNotFoundException;
import org.ecommerce.exceptions.EntityAlreadyExistsException;

@ApplicationScoped
public class OutboxRepository {

@Inject
EntityManager em;

public List<Outbox> findAll() {
return em.createQuery("from Outbox", Outbox.class)
.getResultList();
}

public List<Outbox> findPendingMessages() {
TypedQuery<Outbox> query = em.createQuery(
"SELECT o FROM Outbox o WHERE o.status = 'PENDING'", Outbox.class);
return query.getResultList();
}

public Outbox findById(UUID id) throws EntityNotFoundException {
Outbox outbox = em.find(Outbox.class, id);
if (outbox != null) {
return outbox;
}
throw new EntityNotFoundException("Cannot find outbox entry with ID: " + id);
}

@Transactional
public Outbox insert(Outbox outbox) throws EntityAlreadyExistsException {
try {
outbox.setId(UUID.randomUUID());
em.persist(outbox);
return outbox;
} catch (EntityExistsException e) {
throw new EntityAlreadyExistsException("Outbox entry already exists");
}
}

@Transactional
public Outbox update(Outbox outbox) throws EntityNotFoundException {
try {
return em.merge(outbox);
} catch (IllegalArgumentException e) {
throw new EntityNotFoundException("Cannot find outbox entry with ID: " + outbox.getId());
}
}

@Transactional
public void delete(UUID id) {
Outbox outbox = em.find(Outbox.class, id);
if (outbox != null) {
em.remove(outbox);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.ecommerce.service;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.ecommerce.domain.Outbox;
import org.ecommerce.service.OutboxService;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import io.quarkus.scheduler.Scheduled;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;

@ApplicationScoped
public class OutboxProcessor {

private static final Logger logger = Logger.getLogger(OutboxProcessor.class);

@Inject
OutboxService outboxService;

@Inject
@Channel("products-out")
Emitter<String> productsEmitter;

@Scheduled(every = "5s")
public void processOutbox() {
List<Outbox> messages = outboxService.getPendingMessages();

for (Outbox message : messages) {
try {
productsEmitter.send(message.getMessage()).thenRun(() -> {
outboxService.markAsSent(message.getId());
logger.info("Successfully sent outbox message with ID: " + message.getId());
}).exceptionally(e -> {
logger.error("Failed to send outbox message with ID " + message.getId() + ": " + e.getMessage());
outboxService.markAsFailed(message.getId());
return null;
});
} catch (Exception e) {
logger.error("Error processing outbox message with ID " + message.getId() + ": " + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.ecommerce.service;

import jakarta.enterprise.context.ApplicationScoped;
import org.ecommerce.domain.Outbox;
import org.ecommerce.repository.OutboxRepository;
import org.ecommerce.exceptions.EntityAlreadyExistsException;
import org.ecommerce.exceptions.EntityNotFoundException;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

@ApplicationScoped
public class OutboxService {

@Inject
OutboxRepository outboxRepository;

@Transactional
public Outbox createOutboxMessage(String message) {
Outbox outbox = new Outbox();
outbox.setMessage(message);
outbox.setStatus("PENDING");
outbox.setCreatedAt(Instant.now().toString());
try {
return outboxRepository.insert(outbox);
} catch (EntityAlreadyExistsException e) {
System.err.println(e.getMessage());
return null;
}
}

@Transactional
public void markAsSent(UUID id) {
try {
Outbox outbox = outboxRepository.findById(id);
outbox.setStatus("SENT");
outbox.setSentAt(Instant.now().toString());
outboxRepository.update(outbox);
} catch (EntityNotFoundException e) {
System.err.println(e.getMessage());
}
}

@Transactional
public void markAsFailed(UUID id) {
try {
Outbox outbox = outboxRepository.findById(id);
outbox.setStatus("FAILED");
outboxRepository.update(outbox);
} catch (EntityNotFoundException e) {
System.err.println(e.getMessage());
}
}

@Transactional
public List<Outbox> getPendingMessages() {
return outboxRepository.findPendingMessages();
}
}
Loading

0 comments on commit de8b50a

Please sign in to comment.