Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7362 Support DECFLOAT in Db2 connector #129

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ public Db2DefaultValueConverter(Db2ValueConverters valueConverters, Db2Connectio
public Optional<Object> parseDefaultValue(Column column, String defaultValue) {
LOGGER.info("Parsing default value for column '{}' with expression '{}'", column.name(), defaultValue);
final int dataType = column.jdbcType();
final DefaultValueMapper mapper = defaultValueMappers.get(dataType);
DefaultValueMapper mapper = defaultValueMappers.get(dataType);
if (mapper == null) {
LOGGER.warn("Mapper for type '{}' not found.", dataType);
return Optional.empty();
if (dataType == Types.OTHER) {
if (Db2ValueConverters.matches(column.typeName().toUpperCase(), "DECFLOAT")) {
mapper = nullableDefaultValueMapper();
}
}
if (mapper == null) {
LOGGER.warn("Mapper for type '{}' not found.", dataType);
return Optional.empty();
}
}

try {
Expand Down
70 changes: 69 additions & 1 deletion src/main/java/io/debezium/connector/db2/Db2ValueConverters.java
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reuse the convertDecimal function? I think the contents of convertDecimal and convertDecfloat are the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, it is not. This is taken form PostgreSQL to hadnle potential issue with trailing zeroes. We don't know if it is problem here but it is kind of defensive measure.

Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
*/
package io.debezium.connector.db2;

import java.math.BigDecimal;
import java.sql.Types;
import java.time.ZoneOffset;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
Expand Down Expand Up @@ -52,6 +56,10 @@ public SchemaBuilder schemaBuilder(Column column) {
case Types.TINYINT:
// values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int
return SchemaBuilder.int16();
case Types.OTHER:
if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
return decfloatSchema(column);
}
default:
return super.schemaBuilder(column);
}
Expand All @@ -64,11 +72,49 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case Types.TINYINT:
// values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int
return (data) -> convertSmallInt(column, fieldDefn, data);
case Types.OTHER:
if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
return (data) -> convertDecfloat(column, fieldDefn, data, decimalMode);
}
default:
return super.converter(column, fieldDefn);
}
}

protected Object convertDecfloat(Column column, Field fieldDefn, Object data, DecimalMode mode) {
SpecialValueDecimal value;
BigDecimal newDecimal;

if (data instanceof SpecialValueDecimal) {
value = (SpecialValueDecimal) data;

if (value.getDecimalValue().isEmpty()) {
return SpecialValueDecimal.fromLogical(value, mode, column.name());
}
}
else {
final Object o = toBigDecimal(column, fieldDefn, data);

if (!(o instanceof BigDecimal)) {
return o;
}
value = new SpecialValueDecimal((BigDecimal) o);
}

newDecimal = withScaleAdjustedIfNeeded(column, value.getDecimalValue().get());

if (mode == DecimalMode.PRECISE) {
newDecimal = newDecimal.stripTrailingZeros();
if (newDecimal.scale() < 0) {
newDecimal = newDecimal.setScale(0);
}

return VariableScaleDecimal.fromLogical(fieldDefn.schema(), new SpecialValueDecimal(newDecimal));
}

return SpecialValueDecimal.fromLogical(new SpecialValueDecimal(newDecimal), mode, column.name());
}

/**
* Time precision in DB2 is defined in scale, the default one is 7
*/
Expand All @@ -82,4 +128,26 @@ protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object
return super.convertTimestampWithZone(column, fieldDefn, data);
}

}
/**
* Determine if the uppercase form of a column's type exactly matches or begins with the specified prefix.
* Note that this logic works when the column's {@link Column#typeName() type} contains the type name followed by parentheses.
*
* @param upperCaseTypeName the upper case form of the column's {@link Column#typeName() type name}
* @param upperCaseMatch the upper case form of the expected type or prefix of the type; may not be null
* @return {@code true} if the type matches the specified type, or {@code false} otherwise
*/
protected static boolean matches(String upperCaseTypeName, String upperCaseMatch) {
if (upperCaseTypeName == null) {
return false;
}
return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "(");
}

private SchemaBuilder decfloatSchema(Column column) {
if (decimalMode == DecimalMode.PRECISE) {
return VariableScaleDecimal.builder();
}
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElse(0));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ public void shouldHandleFloatPointDefaultTypes() throws Exception {
}

List<ColumnDefinition> columnDefinitions = Arrays.asList(
// todo: DECFLOAT is currently not supported.
// new ColumnDefinition("val_decfloat", "decfloat", "3.14", "6.28", 3.14, 6.28, AssertionType.FIELD_DEFAULT_EQUAL),
// DECFLOAT is variable scale precise type and Kafka does not support STRUCT
// as default value so it is set to null
new ColumnDefinition("val_decfloat", "decfloat",
"3.14", "6.28", null,
null, AssertionType.FIELD_DEFAULT_EQUAL),
new ColumnDefinition("val_decimal", "decimal(5,2)",
"3.14", "6.28",
BigDecimal.valueOf(3.14), BigDecimal.valueOf(6.28),
Expand Down
147 changes: 147 additions & 0 deletions src/test/java/io/debezium/connector/db2/DatatypesFromSnapshotIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2;

import java.math.BigDecimal;
import java.sql.SQLException;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.db2.Db2ConnectorConfig.SnapshotMode;
import io.debezium.connector.db2.util.TestHelper;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.util.Testing;

/**
* Test fror Db2 datatypes.
*
* @author Jiri Pechanec
*/
public class DatatypesFromSnapshotIT extends AbstractConnectorTest {

private Db2Connection connection;

private static final String[] CREATE_TABLES = {
"CREATE TABLE dt_numeric (id int not null, df decfloat, df16 decfloat(16), df34 decfloat(34), primary key (id))"
};
private static final String[] INSERT_DATA = {
"INSERT INTO dt_numeric VALUES(1, 1, 3.123456789012345678, 3.012345678901234567890123456789)"
};

@Before
public void before() throws SQLException {
connection = TestHelper.testConnection();
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute("DROP TABLE IF EXISTS dt_numeric");
connection.execute(CREATE_TABLES);
connection.execute(INSERT_DATA);

TestHelper.enableTableCdc(connection, "DT_NUMERIC");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Testing.Print.enable();
}

@After
public void after() throws SQLException {
if (connection != null) {
TestHelper.disableDbCdc(connection);
TestHelper.disableTableCdc(connection, "DT_NUMERIC");
connection.execute("DROP TABLE dt_numeric");
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION");
connection.execute("DELETE FROM ASNCDC.IBMQREP_TABVERSION");
connection.close();
}
}

@Test
public void numericTypesPrecise() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric")
.build();

start(Db2Connector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
final SourceRecords records = consumeRecordsByTopic(1);
final SourceRecord rec = records.allRecordsInOrder().get(0);

assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF"), new BigDecimal("1"));
// Loss of precision
assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF16"), new BigDecimal("3.123456789012346"));
assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF34"), new BigDecimal("3.012345678901234567890123456789"));
stopConnector();
}

@Test
public void numericTypesDouble() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric")
.with(Db2ConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.build();

start(Db2Connector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
final SourceRecords records = consumeRecordsByTopic(1);
final SourceRecord rec = records.allRecordsInOrder().get(0);

Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF"))
.isEqualTo(Double.valueOf(1));
// Loss of precision
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF16"))
.isEqualTo(Double.valueOf(3.123456789012346));
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF34"))
.isEqualTo(Double.valueOf(3.012345678901234567890123456789));

stopConnector();
}

@Test
public void numericTypesString() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric")
.with(Db2ConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)
.build();

start(Db2Connector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
final SourceRecords records = consumeRecordsByTopic(1);
final SourceRecord rec = records.allRecordsInOrder().get(0);

Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF"))
.isEqualTo("1");
// Loss of precision
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF16"))
.isEqualTo("3.123456789012346");
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF34"))
.isEqualTo("3.012345678901234567890123456789");

stopConnector();
}

private void assertVariableScaleDecimal(Object actual, BigDecimal expected) {
final Struct v = (Struct) actual;
Assertions.assertThat(v.get(VariableScaleDecimal.SCALE_FIELD)).isEqualTo(expected.scale());
Assertions.assertThat(v.get(VariableScaleDecimal.VALUE_FIELD)).isEqualTo(expected.unscaledValue().toByteArray());
}
}
Loading