diff --git a/src/main/java/io/debezium/connector/db2/Db2DefaultValueConverter.java b/src/main/java/io/debezium/connector/db2/Db2DefaultValueConverter.java index c9ba63a..cfd87c1 100644 --- a/src/main/java/io/debezium/connector/db2/Db2DefaultValueConverter.java +++ b/src/main/java/io/debezium/connector/db2/Db2DefaultValueConverter.java @@ -48,10 +48,17 @@ public Db2DefaultValueConverter(Db2ValueConverters valueConverters, Db2Connectio public Optional parseDefaultValue(Column column, String defaultValue) { LOGGER.info("Parsing default value for column '{}' with expression '{}'", column.name(), defaultValue); final int dataType = column.jdbcType(); - final DefaultValueMapper mapper = defaultValueMappers.get(dataType); + DefaultValueMapper mapper = defaultValueMappers.get(dataType); if (mapper == null) { - LOGGER.warn("Mapper for type '{}' not found.", dataType); - return Optional.empty(); + if (dataType == Types.OTHER) { + if (Db2ValueConverters.matches(column.typeName().toUpperCase(), "DECFLOAT")) { + mapper = nullableDefaultValueMapper(); + } + } + if (mapper == null) { + LOGGER.warn("Mapper for type '{}' not found.", dataType); + return Optional.empty(); + } } try { diff --git a/src/main/java/io/debezium/connector/db2/Db2ValueConverters.java b/src/main/java/io/debezium/connector/db2/Db2ValueConverters.java index ed61f64..f82b63a 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ValueConverters.java +++ b/src/main/java/io/debezium/connector/db2/Db2ValueConverters.java @@ -5,13 +5,17 @@ */ package io.debezium.connector.db2; +import java.math.BigDecimal; import java.sql.Types; import java.time.ZoneOffset; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.data.VariableScaleDecimal; import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; import io.debezium.relational.ValueConverter; @@ -52,6 +56,10 @@ public SchemaBuilder schemaBuilder(Column column) { case Types.TINYINT: // values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int return SchemaBuilder.int16(); + case Types.OTHER: + if (matches(column.typeName().toUpperCase(), "DECFLOAT")) { + return decfloatSchema(column); + } default: return super.schemaBuilder(column); } @@ -64,11 +72,49 @@ public ValueConverter converter(Column column, Field fieldDefn) { case Types.TINYINT: // values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int return (data) -> convertSmallInt(column, fieldDefn, data); + case Types.OTHER: + if (matches(column.typeName().toUpperCase(), "DECFLOAT")) { + return (data) -> convertDecfloat(column, fieldDefn, data, decimalMode); + } default: return super.converter(column, fieldDefn); } } + protected Object convertDecfloat(Column column, Field fieldDefn, Object data, DecimalMode mode) { + SpecialValueDecimal value; + BigDecimal newDecimal; + + if (data instanceof SpecialValueDecimal) { + value = (SpecialValueDecimal) data; + + if (value.getDecimalValue().isEmpty()) { + return SpecialValueDecimal.fromLogical(value, mode, column.name()); + } + } + else { + final Object o = toBigDecimal(column, fieldDefn, data); + + if (!(o instanceof BigDecimal)) { + return o; + } + value = new SpecialValueDecimal((BigDecimal) o); + } + + newDecimal = withScaleAdjustedIfNeeded(column, value.getDecimalValue().get()); + + if (mode == DecimalMode.PRECISE) { + newDecimal = newDecimal.stripTrailingZeros(); + if (newDecimal.scale() < 0) { + newDecimal = newDecimal.setScale(0); + } + + return VariableScaleDecimal.fromLogical(fieldDefn.schema(), new SpecialValueDecimal(newDecimal)); + } + + return SpecialValueDecimal.fromLogical(new SpecialValueDecimal(newDecimal), mode, column.name()); + } + /** * Time precision in DB2 is defined in scale, the default one is 7 */ @@ -82,4 +128,26 @@ protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object return super.convertTimestampWithZone(column, fieldDefn, data); } -} \ No newline at end of file + /** + * Determine if the uppercase form of a column's type exactly matches or begins with the specified prefix. + * Note that this logic works when the column's {@link Column#typeName() type} contains the type name followed by parentheses. + * + * @param upperCaseTypeName the upper case form of the column's {@link Column#typeName() type name} + * @param upperCaseMatch the upper case form of the expected type or prefix of the type; may not be null + * @return {@code true} if the type matches the specified type, or {@code false} otherwise + */ + protected static boolean matches(String upperCaseTypeName, String upperCaseMatch) { + if (upperCaseTypeName == null) { + return false; + } + return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "("); + } + + private SchemaBuilder decfloatSchema(Column column) { + if (decimalMode == DecimalMode.PRECISE) { + return VariableScaleDecimal.builder(); + } + return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElse(0)); + } + +} diff --git a/src/test/java/io/debezium/connector/db2/AbstractDb2DefaultValueIT.java b/src/test/java/io/debezium/connector/db2/AbstractDb2DefaultValueIT.java index 892e1ec..5feaba8 100644 --- a/src/test/java/io/debezium/connector/db2/AbstractDb2DefaultValueIT.java +++ b/src/test/java/io/debezium/connector/db2/AbstractDb2DefaultValueIT.java @@ -125,8 +125,11 @@ public void shouldHandleFloatPointDefaultTypes() throws Exception { } List columnDefinitions = Arrays.asList( - // todo: DECFLOAT is currently not supported. - // new ColumnDefinition("val_decfloat", "decfloat", "3.14", "6.28", 3.14, 6.28, AssertionType.FIELD_DEFAULT_EQUAL), + // DECFLOAT is variable scale precise type and Kafka does not support STRUCT + // as default value so it is set to null + new ColumnDefinition("val_decfloat", "decfloat", + "3.14", "6.28", null, + null, AssertionType.FIELD_DEFAULT_EQUAL), new ColumnDefinition("val_decimal", "decimal(5,2)", "3.14", "6.28", BigDecimal.valueOf(3.14), BigDecimal.valueOf(6.28), diff --git a/src/test/java/io/debezium/connector/db2/DatatypesFromSnapshotIT.java b/src/test/java/io/debezium/connector/db2/DatatypesFromSnapshotIT.java new file mode 100644 index 0000000..622d694 --- /dev/null +++ b/src/test/java/io/debezium/connector/db2/DatatypesFromSnapshotIT.java @@ -0,0 +1,147 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.db2; + +import java.math.BigDecimal; +import java.sql.SQLException; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.db2.Db2ConnectorConfig.SnapshotMode; +import io.debezium.connector.db2.util.TestHelper; +import io.debezium.data.VariableScaleDecimal; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; +import io.debezium.util.Testing; + +/** + * Test fror Db2 datatypes. + * + * @author Jiri Pechanec + */ +public class DatatypesFromSnapshotIT extends AbstractConnectorTest { + + private Db2Connection connection; + + private static final String[] CREATE_TABLES = { + "CREATE TABLE dt_numeric (id int not null, df decfloat, df16 decfloat(16), df34 decfloat(34), primary key (id))" + }; + private static final String[] INSERT_DATA = { + "INSERT INTO dt_numeric VALUES(1, 1, 3.123456789012345678, 3.012345678901234567890123456789)" + }; + + @Before + public void before() throws SQLException { + connection = TestHelper.testConnection(); + connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER"); + connection.execute("DROP TABLE IF EXISTS dt_numeric"); + connection.execute(CREATE_TABLES); + connection.execute(INSERT_DATA); + + TestHelper.enableTableCdc(connection, "DT_NUMERIC"); + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + Testing.Print.enable(); + } + + @After + public void after() throws SQLException { + if (connection != null) { + TestHelper.disableDbCdc(connection); + TestHelper.disableTableCdc(connection, "DT_NUMERIC"); + connection.execute("DROP TABLE dt_numeric"); + connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER"); + connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION"); + connection.execute("DELETE FROM ASNCDC.IBMQREP_TABVERSION"); + connection.close(); + } + } + + @Test + public void numericTypesPrecise() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric") + .build(); + + start(Db2Connector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + final SourceRecords records = consumeRecordsByTopic(1); + final SourceRecord rec = records.allRecordsInOrder().get(0); + + assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF"), new BigDecimal("1")); + // Loss of precision + assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF16"), new BigDecimal("3.123456789012346")); + assertVariableScaleDecimal(((Struct) rec.value()).getStruct("after").get("DF34"), new BigDecimal("3.012345678901234567890123456789")); + stopConnector(); + } + + @Test + public void numericTypesDouble() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric") + .with(Db2ConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .build(); + + start(Db2Connector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + final SourceRecords records = consumeRecordsByTopic(1); + final SourceRecord rec = records.allRecordsInOrder().get(0); + + Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF")) + .isEqualTo(Double.valueOf(1)); + // Loss of precision + Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF16")) + .isEqualTo(Double.valueOf(3.123456789012346)); + Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF34")) + .isEqualTo(Double.valueOf(3.012345678901234567890123456789)); + + stopConnector(); + } + + @Test + public void numericTypesString() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.dt_numeric") + .with(Db2ConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING) + .build(); + + start(Db2Connector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + final SourceRecords records = consumeRecordsByTopic(1); + final SourceRecord rec = records.allRecordsInOrder().get(0); + + Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF")) + .isEqualTo("1"); + // Loss of precision + Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF16")) + .isEqualTo("3.123456789012346"); + Assertions.assertThat(((Struct) rec.value()).getStruct("after").get("DF34")) + .isEqualTo("3.012345678901234567890123456789"); + + stopConnector(); + } + + private void assertVariableScaleDecimal(Object actual, BigDecimal expected) { + final Struct v = (Struct) actual; + Assertions.assertThat(v.get(VariableScaleDecimal.SCALE_FIELD)).isEqualTo(expected.scale()); + Assertions.assertThat(v.get(VariableScaleDecimal.VALUE_FIELD)).isEqualTo(expected.unscaledValue().toByteArray()); + } +}