Skip to content

Commit

Permalink
DBZ-7301 Inject the SnapshotterService
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Feb 5, 2024
1 parent ec2ce55 commit b11d304
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
Expand All @@ -32,23 +33,25 @@ public class Db2ChangeEventSourceFactory implements ChangeEventSourceFactory<Db2
private final EventDispatcher<Db2Partition, TableId> dispatcher;
private final Clock clock;
private final Db2DatabaseSchema schema;
private final SnapshotterService snapshotterService;

public Db2ChangeEventSourceFactory(Db2ConnectorConfig configuration, Db2Connection metadataConnection,
MainConnectionProvidingConnectionFactory<Db2Connection> connectionFactory, ErrorHandler errorHandler,
EventDispatcher<Db2Partition, TableId> dispatcher, Clock clock, Db2DatabaseSchema schema) {
EventDispatcher<Db2Partition, TableId> dispatcher, Clock clock, Db2DatabaseSchema schema, SnapshotterService snapshotterService) {
this.configuration = configuration;
this.metadataConnection = metadataConnection;
this.connectionFactory = connectionFactory;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
this.schema = schema;
this.snapshotterService = snapshotterService;
}

@Override
public SnapshotChangeEventSource<Db2Partition, Db2OffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<Db2Partition> snapshotProgressListener,
NotificationService<Db2Partition, Db2OffsetContext> notificationService) {
return new Db2SnapshotChangeEventSource(configuration, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService);
return new Db2SnapshotChangeEventSource(configuration, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService, snapshotterService);
}

@Override
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,6 +100,9 @@ public ChangeEventSourceCoordinator<Db2Partition, Db2OffsetContext> start(Config
// Service providers
registerServiceProviders(connectorConfig.getServiceRegistry());

final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
final Snapshotter snapshotter = snapshotterService.getSnapshotter();

if (previousOffset != null) {
schema.recover(partition, previousOffset);
}
Expand Down Expand Up @@ -143,12 +148,13 @@ public ChangeEventSourceCoordinator<Db2Partition, Db2OffsetContext> start(Config
errorHandler,
Db2Connector.class,
connectorConfig,
new Db2ChangeEventSourceFactory(connectorConfig, metadataConnection, connectionFactory, errorHandler, dispatcher, clock, schema),
new Db2ChangeEventSourceFactory(connectorConfig, metadataConnection, connectionFactory, errorHandler, dispatcher, clock, schema, snapshotterService),
new DefaultChangeEventSourceMetricsFactory<>(),
dispatcher,
schema,
signalProcessor,
notificationService);
notificationService,
snapshotterService);

coordinator.start(taskContext, this.queue, metadataProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import io.debezium.snapshot.SnapshotterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -43,8 +44,8 @@ public class Db2SnapshotChangeEventSource extends RelationalSnapshotChangeEventS
public Db2SnapshotChangeEventSource(Db2ConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory<Db2Connection> connectionFactory,
Db2DatabaseSchema schema, EventDispatcher<Db2Partition, TableId> dispatcher, Clock clock,
SnapshotProgressListener<Db2Partition> snapshotProgressListener,
NotificationService<Db2Partition, Db2OffsetContext> notificationService) {
super(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService);
NotificationService<Db2Partition, Db2OffsetContext> notificationService, SnapshotterService snapshotterService) {
super(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService, snapshotterService);
this.connectorConfig = connectorConfig;
this.jdbcConnection = connectionFactory.mainConnection();
}
Expand Down

0 comments on commit b11d304

Please sign in to comment.