Skip to content

Commit

Permalink
DBZ-7301 Inject the SnapshotterService
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Feb 5, 2024
1 parent ec2ce55 commit 5972b85
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
Expand All @@ -32,23 +33,26 @@ public class Db2ChangeEventSourceFactory implements ChangeEventSourceFactory<Db2
private final EventDispatcher<Db2Partition, TableId> dispatcher;
private final Clock clock;
private final Db2DatabaseSchema schema;
private final SnapshotterService snapshotterService;

public Db2ChangeEventSourceFactory(Db2ConnectorConfig configuration, Db2Connection metadataConnection,
MainConnectionProvidingConnectionFactory<Db2Connection> connectionFactory, ErrorHandler errorHandler,
EventDispatcher<Db2Partition, TableId> dispatcher, Clock clock, Db2DatabaseSchema schema) {
EventDispatcher<Db2Partition, TableId> dispatcher, Clock clock, Db2DatabaseSchema schema, SnapshotterService snapshotterService) {
this.configuration = configuration;
this.metadataConnection = metadataConnection;
this.connectionFactory = connectionFactory;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
this.schema = schema;
this.snapshotterService = snapshotterService;
}

@Override
public SnapshotChangeEventSource<Db2Partition, Db2OffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<Db2Partition> snapshotProgressListener,
NotificationService<Db2Partition, Db2OffsetContext> notificationService) {
return new Db2SnapshotChangeEventSource(configuration, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService);
return new Db2SnapshotChangeEventSource(configuration, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService,
snapshotterService);
}

@Override
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;

Expand Down Expand Up @@ -98,6 +99,8 @@ public ChangeEventSourceCoordinator<Db2Partition, Db2OffsetContext> start(Config
// Service providers
registerServiceProviders(connectorConfig.getServiceRegistry());

final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);

if (previousOffset != null) {
schema.recover(partition, previousOffset);
}
Expand Down Expand Up @@ -143,12 +146,13 @@ public ChangeEventSourceCoordinator<Db2Partition, Db2OffsetContext> start(Config
errorHandler,
Db2Connector.class,
connectorConfig,
new Db2ChangeEventSourceFactory(connectorConfig, metadataConnection, connectionFactory, errorHandler, dispatcher, clock, schema),
new Db2ChangeEventSourceFactory(connectorConfig, metadataConnection, connectionFactory, errorHandler, dispatcher, clock, schema, snapshotterService),
new DefaultChangeEventSourceMetricsFactory<>(),
dispatcher,
schema,
signalProcessor,
notificationService);
notificationService,
snapshotterService);

coordinator.start(taskContext, this.queue, metadataProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;

public class Db2SnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<Db2Partition, Db2OffsetContext> {
Expand All @@ -43,8 +44,8 @@ public class Db2SnapshotChangeEventSource extends RelationalSnapshotChangeEventS
public Db2SnapshotChangeEventSource(Db2ConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory<Db2Connection> connectionFactory,
Db2DatabaseSchema schema, EventDispatcher<Db2Partition, TableId> dispatcher, Clock clock,
SnapshotProgressListener<Db2Partition> snapshotProgressListener,
NotificationService<Db2Partition, Db2OffsetContext> notificationService) {
super(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService);
NotificationService<Db2Partition, Db2OffsetContext> notificationService, SnapshotterService snapshotterService) {
super(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService, snapshotterService);
this.connectorConfig = connectorConfig;
this.jdbcConnection = connectionFactory.mainConnection();
}
Expand Down

0 comments on commit 5972b85

Please sign in to comment.