From 1271103c2d8cec7601de5a0ddc31ce3439c62302 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 18 Mar 2024 11:46:42 +0100 Subject: [PATCH 1/5] DBZ-7308 Check if configured snapshot mode permits streaming before starting --- .../connector/db2/Db2StreamingChangeEventSource.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java index 3a4610a..25465ac 100644 --- a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java @@ -115,11 +115,6 @@ public void init(Db2OffsetContext offsetContext) { public void execute(ChangeEventSourceContext context, Db2Partition partition, Db2OffsetContext offsetContext) throws InterruptedException { - if (!snapshotterService.getSnapshotter().shouldStream()) { - LOGGER.info("Streaming is not enabled in current configuration"); - return; - } - final Metronome metronome = Metronome.sleeper(pollInterval, clock); final Queue schemaChangeCheckpoints = new PriorityQueue<>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn())); try { From d9c9ec1bc25df2e5f4633ec14c06ba5dddc1138c Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 18 Mar 2024 16:45:55 +0100 Subject: [PATCH 2/5] DBZ-7308 Remove connector specific SnapshotLockProvider and SnapshotterServiceProvider --- .../connector/db2/Db2ConnectorTask.java | 10 ------- .../db2/snapshot/Db2SnapshotLockProvider.java | 30 ------------------- .../Db2SnapshotterServiceProvider.java | 22 -------------- 3 files changed, 62 deletions(-) delete mode 100644 src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotLockProvider.java delete mode 100644 src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotterServiceProvider.java diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java index 1466803..0c2f165 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java @@ -21,8 +21,6 @@ import io.debezium.config.Field; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.common.BaseSourceTask; -import io.debezium.connector.db2.snapshot.Db2SnapshotLockProvider; -import io.debezium.connector.db2.snapshot.Db2SnapshotterServiceProvider; import io.debezium.document.DocumentReader; import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory; import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; @@ -37,7 +35,6 @@ import io.debezium.relational.TableId; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; -import io.debezium.service.spi.ServiceRegistry; import io.debezium.snapshot.SnapshotterService; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; @@ -230,11 +227,4 @@ private static Configuration applyFetchSizeToJdbcConfig(Configuration config) { return config; } - @Override - protected void registerServiceProviders(ServiceRegistry serviceRegistry) { - - super.registerServiceProviders(serviceRegistry); - serviceRegistry.registerServiceProvider(new Db2SnapshotLockProvider()); - serviceRegistry.registerServiceProvider(new Db2SnapshotterServiceProvider()); - } } diff --git a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotLockProvider.java b/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotLockProvider.java deleted file mode 100644 index 5fac906..0000000 --- a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotLockProvider.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.db2.snapshot; - -import io.debezium.bean.StandardBeanNames; -import io.debezium.bean.spi.BeanRegistry; -import io.debezium.connector.db2.Db2ConnectorConfig; -import io.debezium.service.spi.ServiceProvider; -import io.debezium.snapshot.SnapshotLockProvider; -import io.debezium.snapshot.spi.SnapshotLock; - -/** - * An implementation of the {@link ServiceProvider} contract for the {@link SnapshotLock}. - * - * @author Mario Fiore Vitale - */ -public class Db2SnapshotLockProvider extends SnapshotLockProvider { - - @Override - public String snapshotLockingMode(BeanRegistry beanRegistry) { - - Db2ConnectorConfig sqlServerConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, Db2ConnectorConfig.class); - - return sqlServerConnectorConfig.getSnapshotLockingMode().getValue(); - } - -} diff --git a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotterServiceProvider.java b/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotterServiceProvider.java deleted file mode 100644 index c597673..0000000 --- a/src/main/java/io/debezium/connector/db2/snapshot/Db2SnapshotterServiceProvider.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.db2.snapshot; - -import io.debezium.bean.StandardBeanNames; -import io.debezium.bean.spi.BeanRegistry; -import io.debezium.connector.db2.Db2ConnectorConfig; -import io.debezium.snapshot.SnapshotterServiceProvider; - -public class Db2SnapshotterServiceProvider extends SnapshotterServiceProvider { - - @Override - public String snapshotMode(BeanRegistry beanRegistry) { - - Db2ConnectorConfig mySqlConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, Db2ConnectorConfig.class); - - return mySqlConnectorConfig.getSnapshotMode().getValue(); - } -} From 942ab800a4d6709368a304cc312dbbedc07bf389 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 18 Mar 2024 16:47:30 +0100 Subject: [PATCH 3/5] DBZ-7308 Move getSnapshottingTask to RelationalSnapshotChangeEventSource --- .../db2/Db2SnapshotChangeEventSource.java | 35 ------------------- 1 file changed, 35 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java index 8b23a1e..b0edb3b 100644 --- a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java @@ -51,41 +51,6 @@ public Db2SnapshotChangeEventSource(Db2ConnectorConfig connectorConfig, MainConn this.jdbcConnection = connectionFactory.mainConnection(); } - @Override - public SnapshottingTask getSnapshottingTask(Db2Partition partition, Db2OffsetContext previousOffset) { - - final Snapshotter snapshotter = snapshotterService.getSnapshotter(); - - List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); - Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); - - boolean offsetExists = previousOffset != null; - boolean snapshotInProgress = false; - - if (offsetExists) { - snapshotInProgress = previousOffset.isSnapshotRunning(); - } - - if (offsetExists && !previousOffset.isSnapshotRunning()) { - LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."); - } - - boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress); - boolean shouldSnapshotData = snapshotter.shouldSnapshotData(offsetExists, snapshotInProgress); - - if (shouldSnapshotData && shouldSnapshotSchema) { - LOGGER.info("According to the connector configuration both schema and data will be snapshot."); - } - else if (shouldSnapshotSchema) { - LOGGER.info("According to the connector configuration only schema will be snapshot."); - } - - return new SnapshottingTask(shouldSnapshotSchema, shouldSnapshotData, - dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, - false); - } - @Override protected SnapshotContext prepare(Db2Partition partition, boolean onDemand) { return new Db2SnapshotContext(partition, jdbcConnection.getRealDatabaseName(), onDemand); From 6e3c29a496ada5ed58accaeecc57c06416757566 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 18 Mar 2024 16:47:53 +0100 Subject: [PATCH 4/5] DBZ-7308 tableLockingStatement from SnapshotLock interface now takes just one table in input --- .../debezium/connector/db2/Db2SnapshotChangeEventSource.java | 4 +--- .../connector/db2/snapshot/lock/ExclusiveSnapshotLock.java | 5 +---- .../debezium/connector/db2/snapshot/lock/NoSnapshotLock.java | 3 +-- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java index b0edb3b..7ca4ff4 100644 --- a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java @@ -11,7 +11,6 @@ import java.sql.Savepoint; import java.sql.Statement; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -32,7 +31,6 @@ import io.debezium.relational.Tables; import io.debezium.schema.SchemaChangeEvent; import io.debezium.snapshot.SnapshotterService; -import io.debezium.spi.snapshot.Snapshotter; import io.debezium.util.Clock; public class Db2SnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { @@ -90,7 +88,7 @@ else if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.EXC } Optional lockingStatement = snapshotterService.getSnapshotLock().tableLockingStatement(connectorConfig.snapshotLockTimeout(), - Set.of(quoteTableName(tableId))); + quoteTableName(tableId)); if (lockingStatement.isPresent()) { LOGGER.info("Locking table {}", tableId); diff --git a/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java b/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java index dbd08a0..2f58272 100644 --- a/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java +++ b/src/main/java/io/debezium/connector/db2/snapshot/lock/ExclusiveSnapshotLock.java @@ -8,7 +8,6 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; -import java.util.Set; import io.debezium.connector.db2.Db2ConnectorConfig; import io.debezium.snapshot.spi.SnapshotLock; @@ -26,9 +25,7 @@ public void configure(Map properties) { } @Override - public Optional tableLockingStatement(Duration lockTimeout, Set tableIds) { - - String tableId = tableIds.iterator().next(); // For Db2 we expect just one table at time. + public Optional tableLockingStatement(Duration lockTimeout, String tableId) { return Optional.of("SELECT * FROM " + tableId + " WHERE 0=1 WITH CS"); } diff --git a/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java b/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java index 2a1d1a1..ab4294d 100644 --- a/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java +++ b/src/main/java/io/debezium/connector/db2/snapshot/lock/NoSnapshotLock.java @@ -8,7 +8,6 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; -import java.util.Set; import io.debezium.connector.db2.Db2ConnectorConfig; import io.debezium.snapshot.spi.SnapshotLock; @@ -26,7 +25,7 @@ public void configure(Map properties) { } @Override - public Optional tableLockingStatement(Duration lockTimeout, Set tableIds) { + public Optional tableLockingStatement(Duration lockTimeout, String tableId) { return Optional.empty(); } From aaec6aaec289a3e322a9072bd81b0577c78366c2 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Wed, 20 Mar 2024 11:32:36 +0100 Subject: [PATCH 5/5] DBZ-7308 Adapt to core changes --- .../java/io/debezium/connector/db2/Db2ConnectorConfig.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java index 8e59cd6..d31604a 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -404,8 +405,8 @@ public SnapshotIsolationMode getSnapshotIsolationMode() { return this.snapshotIsolationMode; } - public SnapshotLockingMode getSnapshotLockingMode() { - return this.snapshotLockingMode; + public Optional getSnapshotLockingMode() { + return Optional.of(this.snapshotLockingMode); } public SnapshotMode getSnapshotMode() {