Skip to content

Commit

Permalink
Add batch processing to treatInbox
Browse files Browse the repository at this point in the history
  • Loading branch information
ZribiMaram committed Dec 14, 2024
1 parent 950e216 commit ed7c599
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package enit.ecomerce.search_product.repository;

import enit.ecomerce.search_product.product.ProductEntity;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

import java.util.List;

public interface ProducteEntityRepository extends CrudRepository<ProductEntity, String> {
@Repository
public interface ProducteEntityRepository extends JpaRepository<ProductEntity, String> {

@Query("SELECT p FROM ProductEntity p WHERE p.isIndex = false")
List<ProductEntity> findUnindexedProducts();
}
@Query(value = "SELECT * FROM product_entity WHERE is_indexed = false LIMIT :batchSize", nativeQuery = true)
List<ProductEntity> findUnindexedProducts(@Param("batchSize") int batchSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,39 @@

@Service
public class InboxProductCreationService {
private static final int BATCH_SIZE = 100;

@Autowired
private ProducteEntityRepository productEntityRepository;

@Autowired
private ProductRepository productRepository;

@Scheduled(fixedRate = 300000)
@Transactional
public void treatInbox() {
List<ProductEntity> unindexedProducts = productEntityRepository.findUnindexedProducts();
List<ProductEntity> unindexedProducts;

for (ProductEntity product : unindexedProducts) {
try {
productRepository.save(new Product(product));
product.setIndex(true);
productEntityRepository.save(product);
} catch (Exception e) {
System.err.println("Error processing product: " + product.getId() + " - " + e.getMessage());
throw e; // Rethrow the exception to trigger transaction rollback
do {
// Retrieve unindexed products in batches
unindexedProducts = productEntityRepository.findUnindexedProducts(BATCH_SIZE);

if (!unindexedProducts.isEmpty()) {
try {
batchIndexProducts(unindexedProducts);
} catch (Exception e) {
System.err.println("Error processing batch: " + e.getMessage());
throw e; // Rethrow to trigger rollback
}
}
} while (!unindexedProducts.isEmpty());
}

private void batchIndexProducts(List<ProductEntity> unindexedProducts) {
for (ProductEntity product : unindexedProducts) {
productRepository.save(new Product(product));
product.setIndex(true);
productEntityRepository.save(product);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package enit.ecomerce.search_product.service;

import enit.ecomerce.search_product.product.Product;
import enit.ecomerce.search_product.product.ProductEntity;
import enit.ecomerce.search_product.repository.ProductRepository;
import enit.ecomerce.search_product.repository.ProducteEntityRepository;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Service
public class InboxProductCreationService {
@Autowired
private ProducteEntityRepository productEntityRepository;
@Autowired
private ProductRepository productRepository;

private static final int BATCH_SIZE = 50; // Define your batch size

@Scheduled(fixedRate = 300000)
@Transactional
public void treatInbox() {
List<ProductEntity> unindexedProducts;

while (!(unindexedProducts = productEntityRepository.findUnindexedProducts(BATCH_SIZE)).isEmpty()) {
for (ProductEntity product : unindexedProducts) {
try {
productRepository.save(new Product(product));
product.setIndex(true);
productEntityRepository.save(product);
} catch (Exception e) {
System.err.println("Error processing product: " + product.getId() + " - " + e.getMessage());
throw e; // Rethrow the exception to trigger transaction rollback
}
}
}
}
}

This file was deleted.

0 comments on commit ed7c599

Please sign in to comment.