Skip to content

Commit

Permalink
Fixing Some stuff about the consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
FadiFadhlaoui1212 committed Dec 29, 2024
1 parent 608351e commit 27334d2
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 19 deletions.
2 changes: 1 addition & 1 deletion order/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
container_name: db_order
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: azerty
POSTGRES_PASSWORD: fadizoe1212
POSTGRES_DB: db_order
PGDATA: /var/lib/postgresql/data
ports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class CartConsumerConfig {

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CartDTO> kafkaListenerContainerFactory() {
public ConcurrentKafkaListenerContainerFactory<String, CartDTO> CartListenerContainerFactory() {
// Create the consumer configuration
Map<String, Object> props = new LinkedHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@Configuration
public class DeliveryStatusMessageConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DeliveryStatusMessage> kafkaListenerContainerFactory() {
public ConcurrentKafkaListenerContainerFactory<String, DeliveryStatusMessage> DeliveryListenerContainerFactory() {
// Create the consumer configuration
Map<String, Object> props = new LinkedHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CartConsumer {
@Autowired
ItemRepository itemRepository;

@KafkaListener(topics = "cart-topic", groupId = "cartReceiver", containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = "cart-topic", groupId = "cartReceiver", containerFactory = "CartListenerContainerFactory")
public void listen(CartDTO cartDTO) {

logger.info("Received event with ID :"+cartDTO.getCartId());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.microservices.order_service.kafka;

import com.microservices.order_service.dto.DeliveryStatusMessage;
import com.microservices.order_service.events.OrderPaidEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -13,12 +14,10 @@
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

import java.util.UUID;

@Component
public class OrderPaidEventConsumer {
public class DeliveryEventConsumer {

private static final Logger logger = LoggerFactory.getLogger(OrderPaidEventConsumer.class);
private static final Logger logger = LoggerFactory.getLogger(DeliveryEventConsumer.class);

private final RestTemplate restTemplate;

Expand All @@ -28,20 +27,27 @@ public class OrderPaidEventConsumer {
@Value("${shipping.service.url}")
private String shippingServiceUrl;

public OrderPaidEventConsumer(RestTemplate restTemplate) {


public DeliveryEventConsumer(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}

@KafkaListener(
topics = "${spring.kafka.topic.order-paid}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
topics = "delivery-status",
groupId = "deliveryReceiver",
containerFactory = "DeliveryListenerContainerFactory"
)
@Transactional
public void consume(@Payload OrderPaidEvent event,
public void consume(@Payload DeliveryStatusMessage deliveryStatusMessage,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
try {
logger.info("Received deliveryStatusMessage: {}", deliveryStatusMessage.getOrderId());
logger.info("Received deliveryStatusMessage: {}", deliveryStatusMessage.getStatus());


}
/*try {
logger.info("Received OrderPaidEvent: partition={}, offset={}, event={}",
partition, offset, event);
Expand Down Expand Up @@ -98,5 +104,5 @@ private void handleInvalidEvent(OrderPaidEvent event, Exception e) {
private void handleProcessingError(OrderPaidEvent event, Exception e) {
logger.error("Error processing event: {}. Error: {}", event, e.getMessage());
// Implement retry logic or publish to a retry topic if needed
}
} */
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
public class DeliveryStatusMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DeliveryStatusMessageConsumer.class);

@KafkaListener(topics = "delivery-status", groupId = "deliveryReceiver", containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(topics = "${spring.kafka.topic.delivery-status}", groupId = "deliveryReceiver", containerFactory = "DeliveryListenerContainerFactory")
public void listen(DeliveryStatusMessage deliveryStatusMessage) {
logger.info("WELYEEEEEY !!!");

logger.info("Received record: " + deliveryStatusMessage.getOrderId());
logger.info("Delivery status: " + deliveryStatusMessage.getStatus());
Expand Down
5 changes: 2 additions & 3 deletions order/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ spring.kafka.producer.value-serializer=com.microservices.order_service.serializa
mail.service.url = http://localhost:8091/emails/templateId
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect

spring.kafka.consumer.value-deserializer=com.microservices.order_service.deserialization.CartDTODeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*


delivery.service.url=http://localhost:8090/addresses
shipping.service.url=http://localhost:8090/api/shipping

spring.kafka.topic.order-paid=order-paid
spring.kafka.topic.delivery-status=delivery-status
spring.kafka.consumer.group-id=groupId
15 changes: 15 additions & 0 deletions shipping/src/main/java/org/shipping/api/ShippingResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import org.jboss.logging.Logger;
import org.shipping.dto.DeliveryStatusMessage;
import org.shipping.dto.ShipmentDTO;
import org.shipping.dto.ShipmentUpdateDTO;
import org.shipping.messaging.DeliveryStatusPublisher;
import org.shipping.model.DeliveryStatus;
import org.shipping.model.Shipment;
import org.shipping.service.ShippingService;
Expand All @@ -26,6 +28,9 @@ public class ShippingResource {
@Inject
ShippingService shippingService;

@Inject
DeliveryStatusPublisher deliveryStatusPublisher;

private static final Logger logger = Logger.getLogger(ShippingResource.class);

// Créer une nouvelle livraison
Expand Down Expand Up @@ -190,4 +195,14 @@ public Response deleteShipment(@PathParam("shipmentId") UUID shipmentId) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Error deleting shipment").build();
}
}

@POST
@Path("/delivery-status")
public String updateDeliveryStatus(@Valid DeliveryStatusMessage deliveryStatusMessage) {
deliveryStatusPublisher.publishStatus(deliveryStatusMessage.getOrderId(), deliveryStatusMessage.getStatus());
return "Delivery Status Updated";


}

}

0 comments on commit 27334d2

Please sign in to comment.