diff --git a/src/main/java/io/debezium/connector/db2/Db2ChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/db2/Db2ChangeEventSourceFactory.java index f88b430..f11cbec 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ChangeEventSourceFactory.java +++ b/src/main/java/io/debezium/connector/db2/Db2ChangeEventSourceFactory.java @@ -19,6 +19,7 @@ import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.snapshot.SnapshotterService; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; import io.debezium.util.Strings; @@ -32,10 +33,11 @@ public class Db2ChangeEventSourceFactory implements ChangeEventSourceFactory dispatcher; private final Clock clock; private final Db2DatabaseSchema schema; + private final SnapshotterService snapshotterService; public Db2ChangeEventSourceFactory(Db2ConnectorConfig configuration, Db2Connection metadataConnection, MainConnectionProvidingConnectionFactory connectionFactory, ErrorHandler errorHandler, - EventDispatcher dispatcher, Clock clock, Db2DatabaseSchema schema) { + EventDispatcher dispatcher, Clock clock, Db2DatabaseSchema schema, SnapshotterService snapshotterService) { this.configuration = configuration; this.metadataConnection = metadataConnection; this.connectionFactory = connectionFactory; @@ -43,12 +45,14 @@ public Db2ChangeEventSourceFactory(Db2ConnectorConfig configuration, Db2Connecti this.dispatcher = dispatcher; this.clock = clock; this.schema = schema; + this.snapshotterService = snapshotterService; } @Override public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener, NotificationService notificationService) { - return new Db2SnapshotChangeEventSource(configuration, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService); + return new Db2SnapshotChangeEventSource(configuration, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService, + snapshotterService); } @Override diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java index bc280e3..539502d 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java @@ -35,6 +35,7 @@ import io.debezium.relational.TableId; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; +import io.debezium.snapshot.SnapshotterService; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; @@ -98,6 +99,8 @@ public ChangeEventSourceCoordinator start(Config // Service providers registerServiceProviders(connectorConfig.getServiceRegistry()); + final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class); + if (previousOffset != null) { schema.recover(partition, previousOffset); } @@ -143,12 +146,13 @@ public ChangeEventSourceCoordinator start(Config errorHandler, Db2Connector.class, connectorConfig, - new Db2ChangeEventSourceFactory(connectorConfig, metadataConnection, connectionFactory, errorHandler, dispatcher, clock, schema), + new Db2ChangeEventSourceFactory(connectorConfig, metadataConnection, connectionFactory, errorHandler, dispatcher, clock, schema, snapshotterService), new DefaultChangeEventSourceMetricsFactory<>(), dispatcher, schema, signalProcessor, - notificationService); + notificationService, + snapshotterService); coordinator.start(taskContext, this.queue, metadataProvider); diff --git a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java index 8bae993..ea39378 100644 --- a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java @@ -31,6 +31,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.schema.SchemaChangeEvent; +import io.debezium.snapshot.SnapshotterService; import io.debezium.util.Clock; public class Db2SnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { @@ -43,8 +44,8 @@ public class Db2SnapshotChangeEventSource extends RelationalSnapshotChangeEventS public Db2SnapshotChangeEventSource(Db2ConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory connectionFactory, Db2DatabaseSchema schema, EventDispatcher dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener, - NotificationService notificationService) { - super(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService); + NotificationService notificationService, SnapshotterService snapshotterService) { + super(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService, snapshotterService); this.connectorConfig = connectorConfig; this.jdbcConnection = connectionFactory.mainConnection(); }