Skip to content

Commit

Permalink
Merge pull request #14 from SAP/develop
Browse files Browse the repository at this point in the history
Fix issues with migration resuming
  • Loading branch information
lnowakowski authored Mar 21, 2024
2 parents 8fcdbe3 + 1c5eee0 commit d2cb6af
Show file tree
Hide file tree
Showing 39 changed files with 313 additions and 96 deletions.
1 change: 1 addition & 0 deletions commercedbsync/resources/commercedbsync-beans.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<property name="durationinseconds" type="double"/>
<property name="copyMethod" type="String"/>
<property name="keyColumns" type="java.util.List&lt;String&gt;"/>
<property name="batchsize" type="int"/>
</bean>

<bean class="com.sap.cx.boosters.commercedbsync.service.DatabaseCopyBatch">
Expand Down
13 changes: 12 additions & 1 deletion commercedbsync/resources/commercedbsync-items.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,18 @@
<persistence type="property"/>
<modifiers optional="false"/>
<defaultvalue>false</defaultvalue>
</attribute>
</attribute>
<attribute qualifier="resumeMigration" type="boolean">
<description>Resume a failed migration</description>
<persistence type="property" />
<modifiers optional="true"/>
<defaultvalue>false</defaultvalue>
</attribute>
<attribute qualifier="migrationId" type="java.lang.String" >
<description>Migration Id for the job</description>
<persistence type="property" />
<modifiers optional="true"/>
</attribute>
</attributes>
</itemtype>

Expand Down
1 change: 1 addition & 0 deletions commercedbsync/resources/commercedbsync-spring.xml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@

<bean id="fullMigrationJob" class="com.sap.cx.boosters.commercedbsync.jobs.FullMigrationJob"
parent="abstractMigrationJobPerformable">
<property name="modelService" ref="modelService"/>
</bean>

<!-- <bean id="defaultCMTRemoveInterceptor"-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ type.MigrationCronJob.maxWriterWorkers.description=Number of writer workers to b

type.MigrationCronJob.batchSize.name=Batch Size
type.MigrationCronJob.batchSize.description=Batch size used to query data

type.FullMigrationCronJob.resumeMigration.name=Resume Migration
type.FullMigrationCronJob.resumeMigration.description=

type.FullMigrationCronJob.migrationId.name=Migration ID
type.FullMigrationCronJob.migrationId.description=
1 change: 1 addition & 0 deletions commercedbsync/resources/sql/createSchedulerTablesHANA.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
copymethod NVARCHAR(255) NULL,
keycolumns NVARCHAR(255) NULL,
durationinseconds numeric(10,2) NULL DEFAULT 0,
batchsize int NOT NULL DEFAULT 1000,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
copymethod NVARCHAR(255) NULL,
keycolumns NVARCHAR(255) NULL,
durationinseconds numeric(10,2) NULL DEFAULT 0,
batchsize int NOT NULL DEFAULT 1000,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS
copymethod VARCHAR(255) NULL,
keycolumns VARCHAR(255) NULL,
durationinseconds numeric(10, 2) NULL DEFAULT 0,
batchsize int NOT NULL DEFAULT 1000,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
);
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
copymethod NVARCHAR2(255) NULL,
keycolumns NVARCHAR2(255) NULL,
durationinseconds number(10,2) DEFAULT 0 NULL,
batchsize number(10) DEFAULT 1000 NOT NULL,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
)
/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ CREATE TABLE MIGRATIONTOOLKIT_TABLECOPYTASKS (
copymethod VARCHAR(255) NULL,
keycolumns VARCHAR(255) NULL,
durationinseconds numeric(10,2) NULL DEFAULT 0,
batchsize int NOT NULL DEFAULT 1000,
PRIMARY KEY (migrationid, targetnodeid, pipelinename)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public DataPipe<DataSet> create(CopyContext context, CopyContext.DataCopyItem it
try {
pipe.put(MaybeFinished.poison());
} catch (Exception p) {
LOG.error("Cannot contaminate pipe ", p);
LOG.error("Could not close contaminated pipe ", p);
}
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand All @@ -107,7 +107,7 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
context.getMigrationContext().getDataSourceRepository());
String table = copyItem.getSourceItem();
long totalRows = copyItem.getRowCount();
long pageSize = getReaderBatchSizeForTable(context, table);
int pageSize = copyItem.getBatchSize();
try {
PerformanceRecorder recorder = context.getPerformanceProfiler().createRecorder(PerformanceCategory.DB_READ,
table);
Expand Down Expand Up @@ -143,25 +143,29 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
taskRepository.updateTaskCopyMethod(context, copyItem, DataCopyMethod.OFFSET.toString());
taskRepository.updateTaskKeyColumns(context, copyItem, batchColumns);

List<Long> batches = null;
List<Pair<Long, Long>> batches;
if (context.getMigrationContext().isSchedulerResumeEnabled()) {
Set<DatabaseCopyBatch> pendingBatchesForPipeline = taskRepository
.findPendingBatchesForPipeline(context, copyItem);
batches = pendingBatchesForPipeline.stream()
.map(b -> Long.valueOf(b.getLowerBoundary().toString())).collect(Collectors.toList());
.map(b -> Pair.of(Long.valueOf(b.getLowerBoundary().toString()),
Long.valueOf(b.getUpperBoundary().toString())))
.collect(Collectors.toList());
taskRepository.resetPipelineBatches(context, copyItem);
} else {
batches = new ArrayList<>();
for (long offset = 0; offset < totalRows; offset += pageSize) {
batches.add(offset);
batches.add(Pair.of(offset, offset + pageSize));
}
}

Pair<Long, Long> boundaries;
for (int batchId = 0; batchId < batches.size(); batchId++) {
long offset = batches.get(batchId);
DataReaderTask dataReaderTask = new BatchOffsetDataReaderTask(pipeTaskContext, batchId, offset,
batchColumns);
taskRepository.scheduleBatch(context, copyItem, batchId, offset, offset + pageSize);
boundaries = batches.get(batchId);
DataReaderTask dataReaderTask = new BatchOffsetDataReaderTask(pipeTaskContext, batchId,
boundaries.getLeft(), batchColumns);
taskRepository.scheduleBatch(context, copyItem, batchId, boundaries.getLeft(),
boundaries.getRight());
workerExecutor.safelyExecute(dataReaderTask);
}
} else {
Expand All @@ -182,13 +186,12 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
taskRepository.updateTaskCopyMethod(context, copyItem, DataCopyMethod.SEEK.toString());
taskRepository.updateTaskKeyColumns(context, copyItem, Lists.newArrayList(batchColumn));

List<List<Object>> batchMarkersList = null;
List<List<Object>> batchMarkersList;
if (context.getMigrationContext().isSchedulerResumeEnabled()) {
batchMarkersList = new ArrayList<>();
Set<DatabaseCopyBatch> pendingBatchesForPipeline = taskRepository
.findPendingBatchesForPipeline(context, copyItem);
batchMarkersList.addAll(pendingBatchesForPipeline.stream()
.map(b -> Collections.list(b.getLowerBoundary())).collect(Collectors.toList()));
batchMarkersList = pendingBatchesForPipeline.stream()
.map(b -> Collections.list(b.getLowerBoundary())).collect(Collectors.toList());
taskRepository.resetPipelineBatches(context, copyItem);
} else {
MarkersQueryDefinition queryDefinition = new MarkersQueryDefinition();
Expand Down Expand Up @@ -237,9 +240,4 @@ private void scheduleWorkers(CopyContext context, DataWorkerExecutor<Boolean> wo
throw new RuntimeException("Exception while preparing reader tasks", ex);
}
}

private static int getReaderBatchSizeForTable(final CopyContext context, final String tableName) {
Integer tableBatchSize = context.getMigrationContext().getReaderBatchSize(tableName);
return tableBatchSize == null ? context.getMigrationContext().getReaderBatchSize() : tableBatchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private Future<T> internalSafelyExecute(Callable<T> callable, int rejections) th
for (int i = 0; i < rejections; i++) {
waitInterval = backOff.nextBackOff();
}
LOG.trace("worker rejected. Retrying in {}ms...", waitInterval);
LOG.trace("Could not fetch new worker, because all are busy. Retrying again in {}ms...", waitInterval);
Thread.sleep(waitInterval);
return internalSafelyExecute(callable, rejections + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,20 @@ public static class DataCopyItem {
private final String targetItem;
private final Map<String, String> columnMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private final Long rowCount;
private final Integer batchSize;

public DataCopyItem(String sourceItem, String targetItem) {
public DataCopyItem(String sourceItem, String targetItem, Integer batchSize) {
this.sourceItem = sourceItem;
this.targetItem = targetItem;
this.batchSize = batchSize;
this.rowCount = null;
}

public DataCopyItem(String sourceItem, String targetItem, Map<String, String> columnMap, Long rowCount) {
public DataCopyItem(String sourceItem, String targetItem, Map<String, String> columnMap, Long rowCount,
Integer batchSize) {
this.sourceItem = sourceItem;
this.targetItem = targetItem;
this.batchSize = batchSize;
this.columnMap.clear();
this.columnMap.putAll(columnMap);
this.rowCount = rowCount;
Expand Down Expand Up @@ -109,6 +113,10 @@ public Long getRowCount() {
return rowCount;
}

public Integer getBatchSize() {
return batchSize;
}

@Override
public String toString() {
return new StringJoiner(", ", DataCopyItem.class.getSimpleName() + "[", "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public int getReaderBatchSize() {
public Integer getReaderBatchSize(final String tableName) {
String tblConfKey = CommercedbsyncConstants.MIGRATION_DATA_READER_BATCHSIZE_FOR_TABLE.replace("{table}",
tableName);
return configuration.getInteger(tblConfKey, null);
return configuration.getInteger(tblConfKey, getReaderBatchSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ protected void onEvent(final CopyDatabaseTableEvent event) {
CopyContext copyContext = new CopyContext(migrationId, migrationContext, new HashSet<>(),
performanceProfiler);
Set<DatabaseCopyTask> copyTableTasks = databaseCopyTaskRepository.findPendingTasks(copyContext);
Set<CopyContext.DataCopyItem> items = copyTableTasks
.stream().map(task -> new CopyContext.DataCopyItem(task.getSourcetablename(),
task.getTargettablename(), task.getColumnmap(), task.getSourcerowcount()))
Set<CopyContext.DataCopyItem> items = copyTableTasks.stream()
.map(task -> new CopyContext.DataCopyItem(task.getSourcetablename(), task.getTargettablename(),
task.getColumnmap(), task.getSourcerowcount(), task.getBatchsize()))
.collect(Collectors.toSet());
copyContext.getCopyItems().addAll(items);
databaseMigrationCopyService.copyAllAsync(copyContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,41 @@ public abstract class AbstractMigrationJobPerformable extends AbstractJobPerform
@Override
public boolean isPerformable() {
for (CronJobModel cronJob : getCronJobService().getRunningOrRestartedCronJobs()) {
currentMigrationId = databaseMigrationService.getMigrationID(migrationContext);

if ((cronJob instanceof IncrementalMigrationCronJobModel || cronJob instanceof FullMigrationCronJobModel)) {
LOG.info("Previous migrations job already running {} and Type {} ", cronJob.getCode(),
cronJob.getItemtype());
if (isJobStateAborted(cronJob)) {
try {
abortCurrentMigration();
databaseMigrationService.markRemainingTasksAborted(migrationContext, currentMigrationId);
clearAbortRequestedIfNeeded(cronJob);

LOG.info("Migration with ID: {} was marked as aborted", currentMigrationId);
} catch (Exception e) {
LOG.warn("Failed to abort current migration");
LOG.debug("Migration abort failed", e);
}
} else {
LOG.info("Previous migration job already running {} and type {}", cronJob.getCode(),
cronJob.getItemtype());
}
return false;
}
}

if (StringUtils.isNotEmpty(currentMigrationId)) {
try {
if (databaseMigrationService.getMigrationState(migrationContext, currentMigrationId)
.getStatus() == MigrationProgress.RUNNING) {
LOG.info("Previous migration already running, ID: {}", currentMigrationId);
return false;
}
} catch (Exception e) {
LOG.warn("Unable to fetch current migration status");
LOG.debug("Migration status fetch failed", e);
}
}

return true;
}

Expand Down Expand Up @@ -199,13 +228,12 @@ protected MigrationStatus waitForFinishCronjobs(IncrementalMigrationContext cont
} while (StringUtils.equalsAnyIgnoreCase(status.getStatus().toString(), RUNNING_MIGRATION));

if (aborted) {
LOG.info(" Aborted ...STOPPING migration ");
databaseMigrationService.stopMigration(migrationContext, currentMigrationId);
abortCurrentMigration();
clearAbortRequestedIfNeeded(cronJobModel);
LOG.error("Database migration has been ABORTED, Migration State= " + status + ", Total Tasks "
+ status.getTotalTasks() + ", migration id =" + status.getMigrationID() + ", Completed Tasks "
+ status.getCompletedTasks());
clearAbortRequestedIfNeeded(cronJobModel);
throw new AbortCronJobException("CronJOB ABORTED");
throw new AbortCronJobException("Cronjob ABORTED");
}

if (status.isFailed()) {
Expand All @@ -218,6 +246,11 @@ protected MigrationStatus waitForFinishCronjobs(IncrementalMigrationContext cont
return status;
}

private void abortCurrentMigration() throws Exception {
LOG.info("Aborted ...STOPPING migration");
databaseMigrationService.stopMigration(migrationContext, currentMigrationId);
}

protected LaunchOptions createLaunchOptions(MigrationCronJobModel migrationCronJob) {
final LaunchOptions launchOptions = new LaunchOptions();

Expand All @@ -240,8 +273,8 @@ private void putLaunchOptionProperty(final LaunchOptions launchOptions, String p

protected boolean isJobStateAborted(final CronJobModel cronJobModel) {
this.modelService.refresh(cronJobModel);
LOG.info("cron job status = " + cronJobModel.getStatus());
LOG.info("cron job request to abort =" + cronJobModel.getRequestAbort());
LOG.info("Cron job status: {}", cronJobModel.getStatus());
LOG.info("Cron job request to abort: {}", BooleanUtils.isTrue(cronJobModel.getRequestAbort()));
return ((cronJobModel.getStatus() == CronJobStatus.ABORTED)
|| (cronJobModel.getRequestAbort() != null && cronJobModel.getRequestAbort()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
package com.sap.cx.boosters.commercedbsync.jobs;

import com.google.common.base.Preconditions;
import com.sap.cx.boosters.commercedbsync.constants.CommercedbsyncConstants;
import com.sap.cx.boosters.commercedbsync.context.IncrementalMigrationContext;
import com.sap.cx.boosters.commercedbsync.context.LaunchOptions;
import de.hybris.platform.cronjob.enums.CronJobResult;
import de.hybris.platform.cronjob.enums.CronJobStatus;
import de.hybris.platform.cronjob.jalo.AbortCronJobException;
import de.hybris.platform.cronjob.model.CronJobModel;
import de.hybris.platform.servicelayer.cronjob.PerformResult;
import com.sap.cx.boosters.commercedbsync.model.cron.FullMigrationCronJobModel;
import de.hybris.platform.servicelayer.model.ModelService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,6 +27,12 @@ public class FullMigrationJob extends AbstractMigrationJobPerformable {

private static final Logger LOG = LoggerFactory.getLogger(FullMigrationJob.class);

private final ModelService modelService;

public FullMigrationJob(final ModelService modelService) {
this.modelService = modelService;
}

@Override
public PerformResult perform(final CronJobModel cronJobModel) {
FullMigrationCronJobModel fullMigrationCronJobModel;
Expand Down Expand Up @@ -54,9 +63,24 @@ public PerformResult perform(final CronJobModel cronJobModel) {
incrementalMigrationContext.setIncrementalModeEnabled(false);
incrementalMigrationContext
.setFullDatabaseMigrationEnabled(fullMigrationCronJobModel.isFullDatabaseMigration());
currentMigrationId = databaseMigrationService.startMigration(incrementalMigrationContext,
createLaunchOptions(fullMigrationCronJobModel));
final LaunchOptions launchOptions = createLaunchOptions(fullMigrationCronJobModel);

if (fullMigrationCronJobModel.isResumeMigration()) {
currentMigrationId = fullMigrationCronJobModel.getMigrationId();
Preconditions.checkNotNull(currentMigrationId,
"Migration ID must be present to resume failed migration job");
launchOptions.getPropertyOverrideMap().put(CommercedbsyncConstants.MIGRATION_SCHEDULER_RESUME_ENABLED,
true);
databaseMigrationService.resumeUnfinishedMigration(incrementalMigrationContext, launchOptions,
currentMigrationId);
LOG.info("Resumed Migration {}", currentMigrationId);
} else {
currentMigrationId = databaseMigrationService.startMigration(incrementalMigrationContext,
launchOptions);
LOG.info("Started Migration {}", currentMigrationId);
fullMigrationCronJobModel.setMigrationId(currentMigrationId);
modelService.save(fullMigrationCronJobModel);
}
waitForFinishCronjobs(incrementalMigrationContext, currentMigrationId, cronJobModel);
} catch (final AbortCronJobException e) {
return new PerformResult(CronJobResult.ERROR, CronJobStatus.ABORTED);
Expand Down
Loading

0 comments on commit d2cb6af

Please sign in to comment.