Skip to content

Commit

Permalink
DBZ-5071 Correctly handle NULL values in incremental snapshots
Browse files Browse the repository at this point in the history
This makes changes in the Db2 connector for the corresponding commit in
the main Debezium project.  The main thing we have to do is override the
nullsSortLast function in Db2Connection to define how NULLs sort in Db2.
We also have to update the test suite, since the base test suite now
includes additional tests for working with keys that are nullable.
  • Loading branch information
james-johnston-thumbtack committed Jan 27, 2024
1 parent a1edeff commit a480237
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
7 changes: 7 additions & 0 deletions src/main/java/io/debezium/connector/db2/Db2Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ public String connectionString() {
return connectionString(URL_PATTERN);
}

@Override
public Optional<Boolean> nullsSortLast() {
// "The null value is higher than all other values"
// https://www.ibm.com/docs/en/db2/11.5?topic=subselect-order-by-clause
return Optional.of(true);
}

@Override
public String quotedTableIdString(TableId tableId) {
StringBuilder quoted = new StringBuilder();
Expand Down
33 changes: 31 additions & 2 deletions src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.debezium.junit.Flaky;
import io.debezium.junit.SkipTestRule;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;

Expand All @@ -39,15 +40,18 @@ public void before() throws SQLException {
TestHelper.disableDbCdc(connection);
TestHelper.disableTableCdc(connection, "A");
TestHelper.disableTableCdc(connection, "B");
TestHelper.disableTableCdc(connection, "A42");
TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL");
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute(
"DROP TABLE IF EXISTS a",
"DROP TABLE IF EXISTS b",
"DROP TABLE IF EXISTS a42",
"DROP TABLE IF EXISTS debezium_signal");
connection.execute(
"CREATE TABLE a (pk int not null, aa int, primary key (pk))",
"CREATE TABLE b (pk int not null, aa int, primary key (pk))",
"CREATE TABLE a42 (pk1 int, pk2 int, pk3 int, pk4 int, aa int)",
"CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))");

TestHelper.enableDbCdc(connection);
Expand All @@ -65,11 +69,13 @@ public void after() throws SQLException {
TestHelper.disableDbCdc(connection);
TestHelper.disableTableCdc(connection, "A");
TestHelper.disableTableCdc(connection, "B");
TestHelper.disableTableCdc(connection, "A42");
TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL");
connection.rollback();
connection.execute(
"DROP TABLE IF EXISTS a",
"DROP TABLE IF EXISTS b",
"DROP TABLE IF EXISTS a42",
"DROP TABLE IF EXISTS debezium_signal");
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION");
Expand All @@ -91,6 +97,12 @@ protected void populateTables() throws SQLException {
TestHelper.enableTableCdc(connection, "B");
}

@Override
protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException {
super.populate4PkTable(connection, tableName);
TestHelper.enableTableCdc((Db2Connection) connection, tableName.replaceAll(".*\\.", ""));
}

@Override
protected Class<Db2Connector> connectorClass() {
return Db2Connector.class;
Expand All @@ -111,6 +123,11 @@ protected List<String> topicNames() {
return List.of(topicName(), "testdb.DB2INST1.B");
}

@Override
protected String noPKTopicName() {
return "testdb.DB2INST1.A42";
}

@Override
protected String tableName() {
return "DB2INST1.A";
Expand All @@ -121,6 +138,11 @@ protected List<String> tableNames() {
return List.of(tableName(), "DB2INST1.B");
}

@Override
protected String noPKTableName() {
return "DB2INST1.A42";
}

@Override
protected String signalTableName() {
return "DEBEZIUM_SIGNAL";
Expand All @@ -130,6 +152,11 @@ protected String getSignalTypeFieldName() {
return "TYPE";
}

@Override
protected String returnedIdentifierName(String queriedID) {
return queriedID.toUpperCase();
}

protected void sendAdHocSnapshotSignal() throws SQLException {
connection.execute(
String.format(
Expand All @@ -143,7 +170,8 @@ protected Builder config() {
return TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL")
.with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250);
.with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4");
}

@Override
Expand All @@ -159,7 +187,8 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL")
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4");
}

@Override
Expand Down

0 comments on commit a480237

Please sign in to comment.