Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-5071 Correctly handle NULL values in incremental snapshots #131

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/io/debezium/connector/db2/Db2Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ public String connectionString() {
return connectionString(URL_PATTERN);
}

@Override
public Optional<Boolean> nullsSortLast() {
// "The null value is higher than all other values"
// https://www.ibm.com/docs/en/db2/11.5?topic=subselect-order-by-clause
return Optional.of(true);
}

@Override
public String quotedTableIdString(TableId tableId) {
StringBuilder quoted = new StringBuilder();
Expand Down
33 changes: 31 additions & 2 deletions src/test/java/io/debezium/connector/db2/IncrementalSnapshotIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.debezium.junit.Flaky;
import io.debezium.junit.SkipTestRule;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;

Expand All @@ -39,15 +40,18 @@ public void before() throws SQLException {
TestHelper.disableDbCdc(connection);
TestHelper.disableTableCdc(connection, "A");
TestHelper.disableTableCdc(connection, "B");
TestHelper.disableTableCdc(connection, "A42");
TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL");
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute(
"DROP TABLE IF EXISTS a",
"DROP TABLE IF EXISTS b",
"DROP TABLE IF EXISTS a42",
"DROP TABLE IF EXISTS debezium_signal");
connection.execute(
"CREATE TABLE a (pk int not null, aa int, primary key (pk))",
"CREATE TABLE b (pk int not null, aa int, primary key (pk))",
"CREATE TABLE a42 (pk1 int, pk2 int, pk3 int, pk4 int, aa int)",
"CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))");

TestHelper.enableDbCdc(connection);
Expand All @@ -65,11 +69,13 @@ public void after() throws SQLException {
TestHelper.disableDbCdc(connection);
TestHelper.disableTableCdc(connection, "A");
TestHelper.disableTableCdc(connection, "B");
TestHelper.disableTableCdc(connection, "A42");
TestHelper.disableTableCdc(connection, "DEBEZIUM_SIGNAL");
connection.rollback();
connection.execute(
"DROP TABLE IF EXISTS a",
"DROP TABLE IF EXISTS b",
"DROP TABLE IF EXISTS a42",
"DROP TABLE IF EXISTS debezium_signal");
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION");
Expand All @@ -91,6 +97,12 @@ protected void populateTables() throws SQLException {
TestHelper.enableTableCdc(connection, "B");
}

@Override
protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException {
super.populate4PkTable(connection, tableName);
TestHelper.enableTableCdc((Db2Connection) connection, tableName.replaceAll(".*\\.", ""));
}

@Override
protected Class<Db2Connector> connectorClass() {
return Db2Connector.class;
Expand All @@ -111,6 +123,11 @@ protected List<String> topicNames() {
return List.of(topicName(), "testdb.DB2INST1.B");
}

@Override
protected String noPKTopicName() {
return "testdb.DB2INST1.A42";
}

@Override
protected String tableName() {
return "DB2INST1.A";
Expand All @@ -121,6 +138,11 @@ protected List<String> tableNames() {
return List.of(tableName(), "DB2INST1.B");
}

@Override
protected String noPKTableName() {
return "DB2INST1.A42";
}

@Override
protected String signalTableName() {
return "DEBEZIUM_SIGNAL";
Expand All @@ -130,6 +152,11 @@ protected String getSignalTypeFieldName() {
return "TYPE";
}

@Override
protected String returnedIdentifierName(String queriedID) {
return queriedID.toUpperCase();
}

protected void sendAdHocSnapshotSignal() throws SQLException {
connection.execute(
String.format(
Expand All @@ -143,7 +170,8 @@ protected Builder config() {
return TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL")
.with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250);
.with(Db2ConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4");
}

@Override
Expand All @@ -159,7 +187,8 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.SIGNAL_DATA_COLLECTION, "DB2INST1.DEBEZIUM_SIGNAL")
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DB2INST1.A42:pk1,pk2,pk3,pk4");
}

@Override
Expand Down