From 51c2aad0f04425753d9a5a4567952fee8aabe152 Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 7 Jan 2025 15:03:57 -0800 Subject: [PATCH] DBZ-8561 Correctly handle enum/set types in Vitess v20+ --- .../debezium/connector/vitess/VitessType.java | 8 ++-- .../VStreamOutputMessageDecoder.java | 6 ++- .../debezium/connector/vitess/TestHelper.java | 9 +++- .../VStreamOutputMessageDecoderTest.java | 46 +++++++++++++++++++ 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/debezium/connector/vitess/VitessType.java b/src/main/java/io/debezium/connector/vitess/VitessType.java index 50b32092..68a2be12 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessType.java +++ b/src/main/java/io/debezium/connector/vitess/VitessType.java @@ -91,7 +91,7 @@ public static VitessType resolve(Query.Field field) { } // Resolve JDBC type from vstream FIELD event - public static VitessType resolve(Query.Field field, boolean isInVStreamCopy) { + public static VitessType resolve(Query.Field field, boolean isEnumSetStringValue) { String type = field.getType().name(); switch (type) { case "INT8": @@ -105,16 +105,14 @@ public static VitessType resolve(Query.Field field, boolean isInVStreamCopy) { case "YEAR": return new VitessType(type, Types.INTEGER); case "ENUM": - // Use field.getEnumSetStrings once available in new Vitess versions - if (isInVStreamCopy) { + if (isEnumSetStringValue) { return new VitessType(type, Types.VARCHAR, resolveEnumAndSetValues(field.getColumnType())); } else { return new VitessType(type, Types.INTEGER, resolveEnumAndSetValues(field.getColumnType())); } case "SET": - // Use field.getEnumSetStrings once available in new Vitess versions - if (isInVStreamCopy) { + if (isEnumSetStringValue) { return new VitessType(type, Types.VARCHAR, resolveEnumAndSetValues(field.getColumnType())); } else { diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index 135e965c..8e3fbc12 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -346,6 +346,10 @@ private List resolveColumns(Row row, Table table) { private void handleFieldMessage(Binlogdata.VEvent vEvent, boolean isInVStreamCopy) { Binlogdata.FieldEvent fieldEvent = vEvent.getFieldEvent(); + boolean isEnumSetStringValue = false; + if (isInVStreamCopy || fieldEvent.getEnumSetStringValues()) { + isEnumSetStringValue = true; + } if (fieldEvent == null) { LOGGER.error("fieldEvent is expected from {}", vEvent); } @@ -368,7 +372,7 @@ private void handleFieldMessage(Binlogdata.VEvent vEvent, boolean isInVStreamCop for (short i = 0; i < columnCount; ++i) { Field field = fieldEvent.getFields(i); String columnName = validateColumnName(field.getName(), schemaName, tableName); - VitessType vitessType = VitessType.resolve(field, isInVStreamCopy); + VitessType vitessType = VitessType.resolve(field, isEnumSetStringValue); if (vitessType.getJdbcId() == Types.OTHER) { LOGGER.error("Cannot resolve JDBC type from VStream field {}", field); } diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index c7aa5cef..ed3007bf 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -288,15 +288,20 @@ public static Binlogdata.VEvent defaultFieldEvent() { } public static Binlogdata.VEvent defaultFieldEvent(String shard, String keyspace) { - return newFieldEvent(defaultColumnValues(), shard, keyspace); + return newFieldEvent(defaultColumnValues(), shard, keyspace, false); } public static Binlogdata.VEvent newFieldEvent(List columnValues) { - return newFieldEvent(columnValues, TEST_SHARD, TEST_UNSHARDED_KEYSPACE); + return newFieldEvent(columnValues, TEST_SHARD, TEST_UNSHARDED_KEYSPACE, false); } public static Binlogdata.VEvent newFieldEvent(List columnValues, String shard, String keyspace) { + return newFieldEvent(columnValues, shard, keyspace, false); + } + + public static Binlogdata.VEvent newFieldEvent(List columnValues, String shard, String keyspace, boolean enumSetStringsValues) { Binlogdata.FieldEvent.Builder fieldEventBuilder = Binlogdata.FieldEvent.newBuilder() + .setEnumSetStringValues(enumSetStringsValues) .setTableName(getFullTableName(keyspace, TEST_TABLE)); for (Field field : newFields(columnValues)) { fieldEventBuilder.addFields(field); diff --git a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java index 98cf3e7b..e0484912 100644 --- a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java +++ b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java @@ -8,7 +8,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.sql.Types; import java.time.Instant; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -258,6 +260,50 @@ public void shouldProcessFieldEvent() throws Exception { } } + @Test + public void shouldProcessFieldEventWithEnumSetStringsFlagDisabledAndNoCopy() throws Exception { + // exercise SUT + List columnValues = List.of(new TestHelper.ColumnValue("enum", Query.Type.ENUM, Types.VARCHAR, "foo".getBytes(), "foo")); + Binlogdata.VEvent event = TestHelper.newFieldEvent(columnValues, TestHelper.TEST_SHARD, TestHelper.TEST_UNSHARDED_KEYSPACE, false); + + decoder.processMessage(event, null, null, false, false); + Table table = schema.tableFor(TestHelper.defaultTableId()); + + // verify outcome + assertThat(table).isNotNull(); + assertThat(table.id().schema()).isEqualTo(TestHelper.TEST_UNSHARDED_KEYSPACE); + assertThat(table.id().table()).isEqualTo(TestHelper.TEST_TABLE); + assertThat(table.columns().size()).isEqualTo(columnValues.size()); + List intTypes = List.of(Types.BIGINT, Types.INTEGER); + for (Query.Field field : event.getFieldEvent().getFieldsList()) { + assertThat(table.columnWithName(field.getName())).isNotNull(); + String name = field.getName(); + assertThat(intTypes.contains(table.columnWithName(field.getName()).jdbcType())).isTrue(); + } + } + + @Test + public void shouldProcessFieldEventWithEnumSetStringsFlagEnabledAndNoCopy() throws Exception { + // exercise SUT + List columnValues = List.of( + new TestHelper.ColumnValue("enum", Query.Type.ENUM, Types.VARCHAR, "foo".getBytes(), "foo", List.of("foo", "bar"), "enum"), + new TestHelper.ColumnValue("set", Query.Type.SET, Types.VARCHAR, "foo".getBytes(), "foo", List.of("foo", "bar"), "set")); + Binlogdata.VEvent event = TestHelper.newFieldEvent(columnValues, TestHelper.TEST_SHARD, TestHelper.TEST_UNSHARDED_KEYSPACE, true); + + decoder.processMessage(event, null, null, false, false); + Table table = schema.tableFor(TestHelper.defaultTableId()); + + // verify outcome + assertThat(table).isNotNull(); + assertThat(table.id().schema()).isEqualTo(TestHelper.TEST_UNSHARDED_KEYSPACE); + assertThat(table.id().table()).isEqualTo(TestHelper.TEST_TABLE); + assertThat(table.columns().size()).isEqualTo(columnValues.size()); + for (Query.Field field : event.getFieldEvent().getFieldsList()) { + assertThat(table.columnWithName(field.getName())).isNotNull(); + assertThat((table.columnWithName(field.getName()).jdbcType())).isEqualTo(Types.VARCHAR); + } + } + @Test public void shouldHandleAddColumnPerShard() throws Exception { String shard1 = "-80";