Skip to content

Commit

Permalink
DBZ-7308 Move getSnapshottingTask to RelationalSnapshotChangeEventSource
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and jpechane committed Mar 25, 2024
1 parent 70afb5d commit 391eb03
Showing 1 changed file with 0 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,41 +51,6 @@ public Db2SnapshotChangeEventSource(Db2ConnectorConfig connectorConfig, MainConn
this.jdbcConnection = connectionFactory.mainConnection();
}

@Override
public SnapshottingTask getSnapshottingTask(Db2Partition partition, Db2OffsetContext previousOffset) {

final Snapshotter snapshotter = snapshotterService.getSnapshotter();

List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));

boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;

if (offsetExists) {
snapshotInProgress = previousOffset.isSnapshotRunning();
}

if (offsetExists && !previousOffset.isSnapshotRunning()) {
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
}

boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress);
boolean shouldSnapshotData = snapshotter.shouldSnapshotData(offsetExists, snapshotInProgress);

if (shouldSnapshotData && shouldSnapshotSchema) {
LOGGER.info("According to the connector configuration both schema and data will be snapshot.");
}
else if (shouldSnapshotSchema) {
LOGGER.info("According to the connector configuration only schema will be snapshot.");
}

return new SnapshottingTask(shouldSnapshotSchema, shouldSnapshotData,
dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable,
false);
}

@Override
protected SnapshotContext<Db2Partition, Db2OffsetContext> prepare(Db2Partition partition, boolean onDemand) {
return new Db2SnapshotContext(partition, jdbcConnection.getRealDatabaseName(), onDemand);
Expand Down

0 comments on commit 391eb03

Please sign in to comment.