Skip to content

Commit

Permalink
DBZ-7362 DECFLOAT is precise decimal type
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed Jan 24, 2024
1 parent d233f0c commit fcd2344
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ public Db2DefaultValueConverter(Db2ValueConverters valueConverters, Db2Connectio
public Optional<Object> parseDefaultValue(Column column, String defaultValue) {
LOGGER.info("Parsing default value for column '{}' with expression '{}'", column.name(), defaultValue);
final int dataType = column.jdbcType();
final DefaultValueMapper mapper = defaultValueMappers.get(dataType);
DefaultValueMapper mapper = defaultValueMappers.get(dataType);
if (mapper == null) {
LOGGER.warn("Mapper for type '{}' not found.", dataType);
return Optional.empty();
if (dataType == Types.OTHER) {
if (Db2ValueConverters.matches(column.typeName().toUpperCase(), "DECFLOAT")) {
mapper = nullableDefaultValueMapper();
}
}
if (mapper == null) {
LOGGER.warn("Mapper for type '{}' not found.", dataType);
return Optional.empty();
}
}

try {
Expand Down Expand Up @@ -104,8 +111,6 @@ private static Map<Integer, DefaultValueMapper> createDefaultValueMappers(Db2Con
result.put(Types.DECIMAL, nullableDefaultValueMapper());
result.put(Types.DOUBLE, nullableDefaultValueMapper((c, v) -> Double.parseDouble(v)));
result.put(Types.REAL, nullableDefaultValueMapper((c, v) -> Float.parseFloat(v)));
// decfloat type
result.put(Types.OTHER, nullableDefaultValueMapper(decfloatDefaultValueMapper()));

// Date and time
result.put(Types.DATE, nullableDefaultValueMapper(castTemporalFunctionCall(connection, Types.DATE)));
Expand Down Expand Up @@ -212,13 +217,4 @@ private static String unquote(String value) {
}
return value;
}

public static DefaultValueMapper decfloatDefaultValueMapper() {
return (column, value) -> {
if (Db2ValueConverters.matches(column.typeName().toUpperCase(), "DECFLOAT")) {
return Double.parseDouble(value);
}
return nullableDefaultValueMapper(null);
};
}
}
52 changes: 49 additions & 3 deletions src/main/java/io/debezium/connector/db2/Db2ValueConverters.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
*/
package io.debezium.connector.db2;

import java.math.BigDecimal;
import java.sql.Types;
import java.time.ZoneOffset;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
Expand Down Expand Up @@ -54,7 +58,7 @@ public SchemaBuilder schemaBuilder(Column column) {
return SchemaBuilder.int16();
case Types.OTHER:
if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
return SchemaBuilder.float64();
return decfloatSchema(column);
}
default:
return super.schemaBuilder(column);
Expand All @@ -70,13 +74,47 @@ public ValueConverter converter(Column column, Field fieldDefn) {
return (data) -> convertSmallInt(column, fieldDefn, data);
case Types.OTHER:
if (matches(column.typeName().toUpperCase(), "DECFLOAT")) {
return (data) -> super.convertDouble(column, fieldDefn, data);
return (data) -> convertDecfloat(column, fieldDefn, data, decimalMode);
}
default:
return super.converter(column, fieldDefn);
}
}

protected Object convertDecfloat(Column column, Field fieldDefn, Object data, DecimalMode mode) {
SpecialValueDecimal value;
BigDecimal newDecimal;

if (data instanceof SpecialValueDecimal) {
value = (SpecialValueDecimal) data;

if (value.getDecimalValue().isEmpty()) {
return SpecialValueDecimal.fromLogical(value, mode, column.name());
}
}
else {
final Object o = toBigDecimal(column, fieldDefn, data);

if (!(o instanceof BigDecimal)) {
return o;
}
value = new SpecialValueDecimal((BigDecimal) o);
}

newDecimal = withScaleAdjustedIfNeeded(column, value.getDecimalValue().get());

if (mode == DecimalMode.PRECISE) {
newDecimal = newDecimal.stripTrailingZeros();
if (newDecimal.scale() < 0) {
newDecimal = newDecimal.setScale(0);
}

return VariableScaleDecimal.fromLogical(fieldDefn.schema(), new SpecialValueDecimal(newDecimal));
}

return SpecialValueDecimal.fromLogical(new SpecialValueDecimal(newDecimal), mode, column.name());
}

/**
* Time precision in DB2 is defined in scale, the default one is 7
*/
Expand Down Expand Up @@ -104,4 +142,12 @@ protected static boolean matches(String upperCaseTypeName, String upperCaseMatch
}
return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "(");
}
}

private SchemaBuilder decfloatSchema(Column column) {
if (decimalMode == DecimalMode.PRECISE) {
return VariableScaleDecimal.builder();
}
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElse(0));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ public void shouldHandleFloatPointDefaultTypes() throws Exception {
}

List<ColumnDefinition> columnDefinitions = Arrays.asList(
// DECFLOAT is variable scale precise type and Kafka does not support STRUCT
// as default value so it is set to null
new ColumnDefinition("val_decfloat", "decfloat",
"3.14", "6.28", 3.14d,
6.28d, AssertionType.FIELD_DEFAULT_EQUAL),
"3.14", "6.28", null,
null, AssertionType.FIELD_DEFAULT_EQUAL),
new ColumnDefinition("val_decimal", "decimal(5,2)",
"3.14", "6.28",
BigDecimal.valueOf(3.14), BigDecimal.valueOf(6.28),
Expand Down
147 changes: 147 additions & 0 deletions src/test/java/io/debezium/connector/db2/DatatypesFromSnapshotIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2;

import java.math.BigDecimal;
import java.sql.SQLException;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.db2.Db2ConnectorConfig.SnapshotMode;
import io.debezium.connector.db2.util.TestHelper;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.util.Testing;

/**
* Test fror Db2 datatypes.
*
* @author Jiri Pechanec
*/
public class DatatypesFromSnapshotIT extends AbstractConnectorTest {

private Db2Connection connection;

private static final String[] CREATE_TABLES = {
"CREATE TABLE dt_numeric (id int not null, df decfloat, df16 decfloat(16), df34 decfloat(34), primary key (id))"
};
private static final String[] INSERT_DATA = {
"INSERT INTO dt_numeric VALUES(1, 1, 3.123456789012345678, 3.012345678901234567890123456789)"
};

@Before
public void before() throws SQLException {
connection = TestHelper.testConnection();
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute("DROP TABLE IF EXISTS dt_numeric");
connection.execute(CREATE_TABLES);
connection.execute(INSERT_DATA);

TestHelper.enableTableCdc(connection, "DT_NUMERIC");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Testing.Print.enable();
}

@After
public void after() throws SQLException {
if (connection != null) {
TestHelper.disableDbCdc(connection);
TestHelper.disableTableCdc(connection, "DT_NUMERIC");
connection.execute("DROP TABLE dt_numeric");
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION");
connection.execute("DELETE FROM ASNCDC.IBMQREP_TABVERSION");
connection.close();
}
}

@Test
public void numericTypesPrecise() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric")
.build();

start(Db2Connector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
final SourceRecords records = consumeRecordsByTopic(1);
final SourceRecord rec = records.allRecordsInOrder().get(0);

assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF"), new BigDecimal("1"));
// Loss of precision
assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF16"), new BigDecimal("3.123456789012346"));
assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF34"), new BigDecimal("3.012345678901234567890123456789"));
stopConnector();
}

@Test
public void numericTypesDouble() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric")
.with(Db2ConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.build();

start(Db2Connector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
final SourceRecords records = consumeRecordsByTopic(1);
final SourceRecord rec = records.allRecordsInOrder().get(0);

Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF"))
.isEqualTo(Double.valueOf(1));
// Loss of precision
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF16"))
.isEqualTo(Double.valueOf(3.123456789012346));
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF34"))
.isEqualTo(Double.valueOf(3.012345678901234567890123456789));

stopConnector();
}

@Test
public void numericTypesString() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric")
.with(Db2ConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)
.build();

start(Db2Connector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
final SourceRecords records = consumeRecordsByTopic(1);
final SourceRecord rec = records.allRecordsInOrder().get(0);

Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF"))
.isEqualTo("1");
// Loss of precision
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF16"))
.isEqualTo("3.123456789012346");
Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF34"))
.isEqualTo("3.012345678901234567890123456789");

stopConnector();
}

private void assertVariableScaleDecimal(Object actual, BigDecimal expected) {
final Struct v = (Struct) actual;
Assertions.assertThat(v.get(VariableScaleDecimal.SCALE_FIELD)).isEqualTo(expected.scale());
Assertions.assertThat(v.get(VariableScaleDecimal.VALUE_FIELD)).isEqualTo(expected.unscaledValue().toByteArray());
}
}

0 comments on commit fcd2344

Please sign in to comment.