Skip to content

Commit

Permalink
Merge pull request #9 from ministryofjustice/add-better-logging-batching
Browse files Browse the repository at this point in the history
Improved logging and defer batch_id generation until later
  • Loading branch information
dconneely authored Jan 15, 2025
2 parents b634dfa + e353944 commit c86c109
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import uk.gov.justice.laadces.premigconcor.dao.migration.MigrationScopeRepository;
import uk.gov.justice.laadces.premigconcor.service.CsvOutputService;

import java.time.LocalDate;
import java.util.HashSet;
import java.util.TreeSet;

Expand All @@ -26,11 +27,13 @@
@Slf4j
class Runner implements ApplicationRunner {
// File paths to write CSV output to (for validation).
private static final String PATH_CSV_OUTPUT_FOUND_CONCOR = "./premigconcor-concorCases.csv";
private static final String PATH_CSV_OUTPUT_MISSING_CONCOR = "./premigconcor-concorMissing.csv";
private static final String PATH_CSV_OUTPUT_FOUND_FDC = "./premigconcor-fdcCases.csv";
private static final String PATH_CSV_OUTPUT_MISSING_FDC = "./premigconcor-fdcMissing.csv";
private static final int MAX_COUNT_MAAT_IDS = 5000; // truncate data set if non-negative.
private static final String PREFIX = "./" + LocalDate.now() + "_premigconcor-";
private static final String PATH_CSV_OUTPUT_FOUND_CONCOR = PREFIX + "concorCases.csv";
private static final String PATH_CSV_OUTPUT_MISSING_CONCOR = PREFIX + "concorMissing.csv";
private static final String PATH_CSV_OUTPUT_FOUND_FDC = PREFIX + "fdcCases.csv";
private static final String PATH_CSV_OUTPUT_MISSING_FDC = PREFIX + "fdcMissing.csv";
private static final String PATH_CSV_OUTPUT_SAVED_CM = PREFIX + "saved.csv";
private static final int MAX_COUNT_MAAT_IDS = -1; // Use full data if negative, truncated data if non-negative.

private final MigrationScopeRepository migrationScopeRepository;
private final ConcorContributionRepository concorContributionRepository;
Expand Down Expand Up @@ -61,16 +64,21 @@ public void run(final ApplicationArguments args) throws Exception {
csvOutputService.writeMaatIds(PATH_CSV_OUTPUT_MISSING_FDC, MaatId.of(missingFdcs));

log.info("Checking {} maatIds for unprocessed case-migrations", maatIds.size());
caseMigrationRepository.deleteUnprocessed(maatIds);
final int deleted = caseMigrationRepository.deleteUnprocessed(maatIds);
log.info("Deleted {} unprocessed case_migration rows from integration database", deleted);

final var found = new HashSet<CaseMigration>();
found.addAll(foundConcors);
found.addAll(foundFdcs);
log.info("Checking {} case-migrations for existing duplicates", found.size());
caseMigrationRepository.removeExisting(found);
final int[] removed = caseMigrationRepository.removeExisting(found);
log.info("Left {} case-migrations (removed {} dups of {} case_migration rows)", found.size(), removed[1], removed[0]);

log.info("Persisting {} non-duplicate case-migrations", found.size());
caseMigrationRepository.saveAll(found);
final var renumbered = caseMigrationRepository.renumberBatchIds(found);
log.info("Renumbered {} case-migrations", renumbered.size());
csvOutputService.writeCaseMigrations(PATH_CSV_OUTPUT_SAVED_CM, renumbered);
final int saved = caseMigrationRepository.saveAll(renumbered);
log.info("Saved {} case_migration rows to the integration database", saved);

log.info("Exiting run...");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public static CaseMigration ofPrimaryKey(long maatId, long concorContributionId,
return new CaseMigration(maatId, concorContributionId != 0 ? "Contribution" : "Fdc", concorContributionId, fdcId, null, null, null, null, null);
}

public CaseMigration withBatchId(long batchId) {
return new CaseMigration(maatId(), recordType(), concorContributionId(), fdcId(), batchId, isProcessed(), processedDate(), httpStatus(), payload());
}

/**
* Implementation of equals() that ignores every record component except (maatId, concorContributionId, fdcId),
* which is the primary key in the table. Annoying as one reason to use records was to avoid writing this.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package uk.gov.justice.laadces.premigconcor.dao.integration;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
Expand All @@ -15,7 +14,6 @@
import java.util.List;

@Repository
@Slf4j
public class CaseMigrationRepository {
private static final int BATCH_SIZE = 990;
private final NamedParameterJdbcTemplate integration;
Expand All @@ -24,7 +22,7 @@ public CaseMigrationRepository(@Qualifier("integrationNamedParameterJdbcTemplate
this.integration = integration;
}

public void deleteUnprocessed(final List<Long> maatIds) {
public int deleteUnprocessed(final List<Long> maatIds) {
final var partitions = partition(maatIds, BATCH_SIZE);
final var batchArgs = partitions.stream()
.map(partition -> new MapSqlParameterSource("maatIds", partition))
Expand All @@ -35,7 +33,7 @@ public void deleteUnprocessed(final List<Long> maatIds) {
WHERE is_processed <> TRUE
AND maat_id IN (:maatIds)
""", batchArgs);
log.info("deleteUnprocessed: Deleted {} rows", Arrays.stream(rowsDeleted).sum());
return Arrays.stream(rowsDeleted).sum(); // total number of deleted rows.
}

/**
Expand All @@ -56,30 +54,51 @@ <T> List<List<T>> partition(final List<T> input, final int partitionSize) {
for (int index = 0; index < inputSize; index += partitionSize) {
output.add(input.subList(index, Math.min(index + partitionSize, inputSize)));
}
return output;
return output; // the list of lists (each of size partitionSize).
}

public void removeExisting(final Collection<CaseMigration> caseMigrations) {
final int originalSize = caseMigrations.size();
final var counts = new Object() { // need to be 'effectively final' to access from lambda.
int total = 0;
int removed = 0;
};
public int[] removeExisting(final Collection<CaseMigration> caseMigrations) {
final int[] counts = {0, 0}; // {total, removed}
integration.query("""
SELECT maat_id, concor_contribution_id, fdc_id
FROM case_migration
""", rs -> {
final var caseMigration = CaseMigration.ofPrimaryKey(rs.getLong(1), rs.getLong(2), rs.getLong(3));
++counts.total;
++counts[0];
if (caseMigrations.remove(caseMigration)) {
++counts.removed;
++counts[1];
}
});
log.info("Removed {} of {} original case-migrations", counts.removed, originalSize);
log.info("Remaining {} case-migrations are distinct from the {} already in the database", caseMigrations.size(), counts.total);
return counts; // size of case_migration table, duplicates between caseMigrations parameter and database table.
}

public void saveAll(final Collection<CaseMigration> caseMigrations) {
private static final long CONCOR_BATCH_ID_DIVISOR = 250L; // case_migration batch_id size for concor_contributions
private static final long FDC_BATCH_ID_DIVISOR = 500L; // case_migration batch_id size for fdc_contributions

/**
* Returns a copy of the input where any CaseMigration without a batch_id is given a batch_id.
*
* @param caseMigrations input list of CaseMigration instances which may have null batch_ids.
* @return List&lt;CaseMigration&gt; where each instance has a non-null batch_id.
*/
public List<CaseMigration> renumberBatchIds(Collection<CaseMigration> caseMigrations) {
final int[] counts = {0, 0}; // {concors, fdcs}
final var renumbered = new ArrayList<CaseMigration>(caseMigrations.size());
caseMigrations.forEach(caseMigration -> {
if (caseMigration.batchId() != null) {
renumbered.add(caseMigration); // has a batch_id already.
} else {
if (caseMigration.concorContributionId() > 0) {
renumbered.add(caseMigration.withBatchId((counts[0]++) / CONCOR_BATCH_ID_DIVISOR));
} else {
renumbered.add(caseMigration.withBatchId((counts[1]++) / FDC_BATCH_ID_DIVISOR));
}
}
});
return renumbered; // the copied collection.
}

public int saveAll(final Collection<CaseMigration> caseMigrations) {
final int[][] rowsInserted = integration.getJdbcOperations().batchUpdate("""
INSERT INTO case_migration (maat_id, record_type, concor_contribution_id, fdc_id,
batch_id, is_processed, processed_date, http_status, payload)
Expand Down Expand Up @@ -120,6 +139,6 @@ INSERT INTO case_migration (maat_id, record_type, concor_contribution_id, fdc_id
stmt.setNull(9, Types.VARCHAR);
}
});
log.info("saveAll: Inserted {} rows", Arrays.stream(rowsInserted).flatMapToInt(Arrays::stream).sum());
return Arrays.stream(rowsInserted).flatMapToInt(Arrays::stream).sum(); // total number of inserted rows.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
@Repository
public class ConcorContributionRepository {
private static final int MAATDB_BATCH_QUERY_SIZE = 990; // concor_contributions query batch size
private static final long INTDB_BATCH_ID_DIVISOR = 250L; // case_migration batch_id size
private final JdbcClient maat;

public ConcorContributionRepository(@Qualifier("maatJdbcClient") final JdbcClient maat) {
Expand All @@ -22,7 +21,6 @@ public ConcorContributionRepository(@Qualifier("maatJdbcClient") final JdbcClien

public void addLatestIdsByMaatIds(final List<Long> maatIds, final Collection<CaseMigration> foundConcors, final Collection<Long> missingConcors) {
final int count = maatIds.size();
final long[] batchIndex = {0L}; // Needs to be "effectively final".
for (int i = 0; i < count; i += MAATDB_BATCH_QUERY_SIZE) {
final var subList = maatIds.subList(i, Math.min(count, i + MAATDB_BATCH_QUERY_SIZE));
final var paramSource = new MapSqlParameterSource("maatIds", subList);
Expand All @@ -38,7 +36,7 @@ WHERE rep_id IN (:maatIds) AND status = 'SENT'
final long concorContributionId = rs.getLong(1);
final long maatId = rs.getLong(2);
set.remove(maatId);
return CaseMigration.ofConcorContribution(maatId, concorContributionId, (batchIndex[0]++) / INTDB_BATCH_ID_DIVISOR);
return CaseMigration.ofConcorContribution(maatId, concorContributionId, null);
})
.list());
if (!set.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
@Repository
public class FdcContributionRepository {
private static final int MAATDB_BATCH_QUERY_SIZE = 990; // fdc_contributions query batch size
private static final long INTDB_BATCH_ID_DIVISOR = 500L; // case_migration batch_id size
private final JdbcClient maat;

public FdcContributionRepository(@Qualifier("maatJdbcClient") final JdbcClient maat) {
Expand All @@ -22,7 +21,6 @@ public FdcContributionRepository(@Qualifier("maatJdbcClient") final JdbcClient m

public void addLatestIdsByMaatIds(final List<Long> maatIds, final Collection<CaseMigration> foundFdcs, final Collection<Long> missingFdcs) {
final int count = maatIds.size();
final long[] batchIndex = {0L}; // Needs to be "effectively final".
for (int i = 0; i < count; i += MAATDB_BATCH_QUERY_SIZE) {
final var subList = maatIds.subList(i, Math.min(count, i + MAATDB_BATCH_QUERY_SIZE));
final var paramSource = new MapSqlParameterSource("maatIds", subList);
Expand All @@ -38,7 +36,7 @@ WHERE rep_id IN (:maatIds)
final long fdcId = rs.getLong(1);
final long maatId = rs.getLong(2);
set.remove(maatId);
return CaseMigration.ofFdcContribution(maatId, fdcId, (batchIndex[0]++) / INTDB_BATCH_ID_DIVISOR);
return CaseMigration.ofFdcContribution(maatId, fdcId, null);
})
.list());
if (!set.isEmpty()) {
Expand Down

0 comments on commit c86c109

Please sign in to comment.