From e3539449e00fb561fd75d17b1ed056b51969d205 Mon Sep 17 00:00:00 2001 From: David Conneely <899881+dconneely@users.noreply.github.com> Date: Wed, 15 Jan 2025 14:58:35 +0000 Subject: [PATCH] Improved logging and defer batch_id generation until later --- .../justice/laadces/premigconcor/Runner.java | 26 +++++---- .../dao/integration/CaseMigration.java | 4 ++ .../integration/CaseMigrationRepository.java | 53 +++++++++++++------ .../maat/ConcorContributionRepository.java | 4 +- .../dao/maat/FdcContributionRepository.java | 4 +- 5 files changed, 59 insertions(+), 32 deletions(-) diff --git a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/Runner.java b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/Runner.java index d1eba6f..407a9a5 100644 --- a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/Runner.java +++ b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/Runner.java @@ -14,6 +14,7 @@ import uk.gov.justice.laadces.premigconcor.dao.migration.MigrationScopeRepository; import uk.gov.justice.laadces.premigconcor.service.CsvOutputService; +import java.time.LocalDate; import java.util.HashSet; import java.util.TreeSet; @@ -26,11 +27,13 @@ @Slf4j class Runner implements ApplicationRunner { // File paths to write CSV output to (for validation). - private static final String PATH_CSV_OUTPUT_FOUND_CONCOR = "./premigconcor-concorCases.csv"; - private static final String PATH_CSV_OUTPUT_MISSING_CONCOR = "./premigconcor-concorMissing.csv"; - private static final String PATH_CSV_OUTPUT_FOUND_FDC = "./premigconcor-fdcCases.csv"; - private static final String PATH_CSV_OUTPUT_MISSING_FDC = "./premigconcor-fdcMissing.csv"; - private static final int MAX_COUNT_MAAT_IDS = 5000; // truncate data set if non-negative. + private static final String PREFIX = "./" + LocalDate.now() + "_premigconcor-"; + private static final String PATH_CSV_OUTPUT_FOUND_CONCOR = PREFIX + "concorCases.csv"; + private static final String PATH_CSV_OUTPUT_MISSING_CONCOR = PREFIX + "concorMissing.csv"; + private static final String PATH_CSV_OUTPUT_FOUND_FDC = PREFIX + "fdcCases.csv"; + private static final String PATH_CSV_OUTPUT_MISSING_FDC = PREFIX + "fdcMissing.csv"; + private static final String PATH_CSV_OUTPUT_SAVED_CM = PREFIX + "saved.csv"; + private static final int MAX_COUNT_MAAT_IDS = -1; // Use full data if negative, truncated data if non-negative. private final MigrationScopeRepository migrationScopeRepository; private final ConcorContributionRepository concorContributionRepository; @@ -61,16 +64,21 @@ public void run(final ApplicationArguments args) throws Exception { csvOutputService.writeMaatIds(PATH_CSV_OUTPUT_MISSING_FDC, MaatId.of(missingFdcs)); log.info("Checking {} maatIds for unprocessed case-migrations", maatIds.size()); - caseMigrationRepository.deleteUnprocessed(maatIds); + final int deleted = caseMigrationRepository.deleteUnprocessed(maatIds); + log.info("Deleted {} unprocessed case_migration rows from integration database", deleted); final var found = new HashSet(); found.addAll(foundConcors); found.addAll(foundFdcs); log.info("Checking {} case-migrations for existing duplicates", found.size()); - caseMigrationRepository.removeExisting(found); + final int[] removed = caseMigrationRepository.removeExisting(found); + log.info("Left {} case-migrations (removed {} dups of {} case_migration rows)", found.size(), removed[1], removed[0]); - log.info("Persisting {} non-duplicate case-migrations", found.size()); - caseMigrationRepository.saveAll(found); + final var renumbered = caseMigrationRepository.renumberBatchIds(found); + log.info("Renumbered {} case-migrations", renumbered.size()); + csvOutputService.writeCaseMigrations(PATH_CSV_OUTPUT_SAVED_CM, renumbered); + final int saved = caseMigrationRepository.saveAll(renumbered); + log.info("Saved {} case_migration rows to the integration database", saved); log.info("Exiting run..."); } diff --git a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigration.java b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigration.java index 6495d63..6affd63 100644 --- a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigration.java +++ b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigration.java @@ -25,6 +25,10 @@ public static CaseMigration ofPrimaryKey(long maatId, long concorContributionId, return new CaseMigration(maatId, concorContributionId != 0 ? "Contribution" : "Fdc", concorContributionId, fdcId, null, null, null, null, null); } + public CaseMigration withBatchId(long batchId) { + return new CaseMigration(maatId(), recordType(), concorContributionId(), fdcId(), batchId, isProcessed(), processedDate(), httpStatus(), payload()); + } + /** * Implementation of equals() that ignores every record component except (maatId, concorContributionId, fdcId), * which is the primary key in the table. Annoying as one reason to use records was to avoid writing this. diff --git a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigrationRepository.java b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigrationRepository.java index ac17b28..e664e41 100644 --- a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigrationRepository.java +++ b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/integration/CaseMigrationRepository.java @@ -1,6 +1,5 @@ package uk.gov.justice.laadces.premigconcor.dao.integration; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -15,7 +14,6 @@ import java.util.List; @Repository -@Slf4j public class CaseMigrationRepository { private static final int BATCH_SIZE = 990; private final NamedParameterJdbcTemplate integration; @@ -24,7 +22,7 @@ public CaseMigrationRepository(@Qualifier("integrationNamedParameterJdbcTemplate this.integration = integration; } - public void deleteUnprocessed(final List maatIds) { + public int deleteUnprocessed(final List maatIds) { final var partitions = partition(maatIds, BATCH_SIZE); final var batchArgs = partitions.stream() .map(partition -> new MapSqlParameterSource("maatIds", partition)) @@ -35,7 +33,7 @@ public void deleteUnprocessed(final List maatIds) { WHERE is_processed <> TRUE AND maat_id IN (:maatIds) """, batchArgs); - log.info("deleteUnprocessed: Deleted {} rows", Arrays.stream(rowsDeleted).sum()); + return Arrays.stream(rowsDeleted).sum(); // total number of deleted rows. } /** @@ -56,30 +54,51 @@ List> partition(final List input, final int partitionSize) { for (int index = 0; index < inputSize; index += partitionSize) { output.add(input.subList(index, Math.min(index + partitionSize, inputSize))); } - return output; + return output; // the list of lists (each of size partitionSize). } - public void removeExisting(final Collection caseMigrations) { - final int originalSize = caseMigrations.size(); - final var counts = new Object() { // need to be 'effectively final' to access from lambda. - int total = 0; - int removed = 0; - }; + public int[] removeExisting(final Collection caseMigrations) { + final int[] counts = {0, 0}; // {total, removed} integration.query(""" SELECT maat_id, concor_contribution_id, fdc_id FROM case_migration """, rs -> { final var caseMigration = CaseMigration.ofPrimaryKey(rs.getLong(1), rs.getLong(2), rs.getLong(3)); - ++counts.total; + ++counts[0]; if (caseMigrations.remove(caseMigration)) { - ++counts.removed; + ++counts[1]; } }); - log.info("Removed {} of {} original case-migrations", counts.removed, originalSize); - log.info("Remaining {} case-migrations are distinct from the {} already in the database", caseMigrations.size(), counts.total); + return counts; // size of case_migration table, duplicates between caseMigrations parameter and database table. } - public void saveAll(final Collection caseMigrations) { + private static final long CONCOR_BATCH_ID_DIVISOR = 250L; // case_migration batch_id size for concor_contributions + private static final long FDC_BATCH_ID_DIVISOR = 500L; // case_migration batch_id size for fdc_contributions + + /** + * Returns a copy of the input where any CaseMigration without a batch_id is given a batch_id. + * + * @param caseMigrations input list of CaseMigration instances which may have null batch_ids. + * @return List<CaseMigration> where each instance has a non-null batch_id. + */ + public List renumberBatchIds(Collection caseMigrations) { + final int[] counts = {0, 0}; // {concors, fdcs} + final var renumbered = new ArrayList(caseMigrations.size()); + caseMigrations.forEach(caseMigration -> { + if (caseMigration.batchId() != null) { + renumbered.add(caseMigration); // has a batch_id already. + } else { + if (caseMigration.concorContributionId() > 0) { + renumbered.add(caseMigration.withBatchId((counts[0]++) / CONCOR_BATCH_ID_DIVISOR)); + } else { + renumbered.add(caseMigration.withBatchId((counts[1]++) / FDC_BATCH_ID_DIVISOR)); + } + } + }); + return renumbered; // the copied collection. + } + + public int saveAll(final Collection caseMigrations) { final int[][] rowsInserted = integration.getJdbcOperations().batchUpdate(""" INSERT INTO case_migration (maat_id, record_type, concor_contribution_id, fdc_id, batch_id, is_processed, processed_date, http_status, payload) @@ -120,6 +139,6 @@ INSERT INTO case_migration (maat_id, record_type, concor_contribution_id, fdc_id stmt.setNull(9, Types.VARCHAR); } }); - log.info("saveAll: Inserted {} rows", Arrays.stream(rowsInserted).flatMapToInt(Arrays::stream).sum()); + return Arrays.stream(rowsInserted).flatMapToInt(Arrays::stream).sum(); // total number of inserted rows. } } diff --git a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/ConcorContributionRepository.java b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/ConcorContributionRepository.java index e3d8b2c..50fcc73 100644 --- a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/ConcorContributionRepository.java +++ b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/ConcorContributionRepository.java @@ -13,7 +13,6 @@ @Repository public class ConcorContributionRepository { private static final int MAATDB_BATCH_QUERY_SIZE = 990; // concor_contributions query batch size - private static final long INTDB_BATCH_ID_DIVISOR = 250L; // case_migration batch_id size private final JdbcClient maat; public ConcorContributionRepository(@Qualifier("maatJdbcClient") final JdbcClient maat) { @@ -22,7 +21,6 @@ public ConcorContributionRepository(@Qualifier("maatJdbcClient") final JdbcClien public void addLatestIdsByMaatIds(final List maatIds, final Collection foundConcors, final Collection missingConcors) { final int count = maatIds.size(); - final long[] batchIndex = {0L}; // Needs to be "effectively final". for (int i = 0; i < count; i += MAATDB_BATCH_QUERY_SIZE) { final var subList = maatIds.subList(i, Math.min(count, i + MAATDB_BATCH_QUERY_SIZE)); final var paramSource = new MapSqlParameterSource("maatIds", subList); @@ -38,7 +36,7 @@ WHERE rep_id IN (:maatIds) AND status = 'SENT' final long concorContributionId = rs.getLong(1); final long maatId = rs.getLong(2); set.remove(maatId); - return CaseMigration.ofConcorContribution(maatId, concorContributionId, (batchIndex[0]++) / INTDB_BATCH_ID_DIVISOR); + return CaseMigration.ofConcorContribution(maatId, concorContributionId, null); }) .list()); if (!set.isEmpty()) { diff --git a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/FdcContributionRepository.java b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/FdcContributionRepository.java index 930aa86..7f38da1 100644 --- a/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/FdcContributionRepository.java +++ b/premigconcor/src/main/java/uk/gov/justice/laadces/premigconcor/dao/maat/FdcContributionRepository.java @@ -13,7 +13,6 @@ @Repository public class FdcContributionRepository { private static final int MAATDB_BATCH_QUERY_SIZE = 990; // fdc_contributions query batch size - private static final long INTDB_BATCH_ID_DIVISOR = 500L; // case_migration batch_id size private final JdbcClient maat; public FdcContributionRepository(@Qualifier("maatJdbcClient") final JdbcClient maat) { @@ -22,7 +21,6 @@ public FdcContributionRepository(@Qualifier("maatJdbcClient") final JdbcClient m public void addLatestIdsByMaatIds(final List maatIds, final Collection foundFdcs, final Collection missingFdcs) { final int count = maatIds.size(); - final long[] batchIndex = {0L}; // Needs to be "effectively final". for (int i = 0; i < count; i += MAATDB_BATCH_QUERY_SIZE) { final var subList = maatIds.subList(i, Math.min(count, i + MAATDB_BATCH_QUERY_SIZE)); final var paramSource = new MapSqlParameterSource("maatIds", subList); @@ -38,7 +36,7 @@ WHERE rep_id IN (:maatIds) final long fdcId = rs.getLong(1); final long maatId = rs.getLong(2); set.remove(maatId); - return CaseMigration.ofFdcContribution(maatId, fdcId, (batchIndex[0]++) / INTDB_BATCH_ID_DIVISOR); + return CaseMigration.ofFdcContribution(maatId, fdcId, null); }) .list()); if (!set.isEmpty()) {