Skip to content

Commit

Permalink
DBZ-8561 Correctly handle enum/set types in Vitess v20+
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Jan 7, 2025
1 parent 219ee85 commit 51c2aad
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
8 changes: 3 additions & 5 deletions src/main/java/io/debezium/connector/vitess/VitessType.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static VitessType resolve(Query.Field field) {
}

// Resolve JDBC type from vstream FIELD event
public static VitessType resolve(Query.Field field, boolean isInVStreamCopy) {
public static VitessType resolve(Query.Field field, boolean isEnumSetStringValue) {
String type = field.getType().name();
switch (type) {
case "INT8":
Expand All @@ -105,16 +105,14 @@ public static VitessType resolve(Query.Field field, boolean isInVStreamCopy) {
case "YEAR":
return new VitessType(type, Types.INTEGER);
case "ENUM":
// Use field.getEnumSetStrings once available in new Vitess versions
if (isInVStreamCopy) {
if (isEnumSetStringValue) {
return new VitessType(type, Types.VARCHAR, resolveEnumAndSetValues(field.getColumnType()));
}
else {
return new VitessType(type, Types.INTEGER, resolveEnumAndSetValues(field.getColumnType()));
}
case "SET":
// Use field.getEnumSetStrings once available in new Vitess versions
if (isInVStreamCopy) {
if (isEnumSetStringValue) {
return new VitessType(type, Types.VARCHAR, resolveEnumAndSetValues(field.getColumnType()));
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ private List<Column> resolveColumns(Row row, Table table) {

private void handleFieldMessage(Binlogdata.VEvent vEvent, boolean isInVStreamCopy) {
Binlogdata.FieldEvent fieldEvent = vEvent.getFieldEvent();
boolean isEnumSetStringValue = false;
if (isInVStreamCopy || fieldEvent.getEnumSetStringValues()) {
isEnumSetStringValue = true;
}
if (fieldEvent == null) {
LOGGER.error("fieldEvent is expected from {}", vEvent);
}
Expand All @@ -368,7 +372,7 @@ private void handleFieldMessage(Binlogdata.VEvent vEvent, boolean isInVStreamCop
for (short i = 0; i < columnCount; ++i) {
Field field = fieldEvent.getFields(i);
String columnName = validateColumnName(field.getName(), schemaName, tableName);
VitessType vitessType = VitessType.resolve(field, isInVStreamCopy);
VitessType vitessType = VitessType.resolve(field, isEnumSetStringValue);
if (vitessType.getJdbcId() == Types.OTHER) {
LOGGER.error("Cannot resolve JDBC type from VStream field {}", field);
}
Expand Down
9 changes: 7 additions & 2 deletions src/test/java/io/debezium/connector/vitess/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,20 @@ public static Binlogdata.VEvent defaultFieldEvent() {
}

public static Binlogdata.VEvent defaultFieldEvent(String shard, String keyspace) {
return newFieldEvent(defaultColumnValues(), shard, keyspace);
return newFieldEvent(defaultColumnValues(), shard, keyspace, false);
}

public static Binlogdata.VEvent newFieldEvent(List<ColumnValue> columnValues) {
return newFieldEvent(columnValues, TEST_SHARD, TEST_UNSHARDED_KEYSPACE);
return newFieldEvent(columnValues, TEST_SHARD, TEST_UNSHARDED_KEYSPACE, false);
}

public static Binlogdata.VEvent newFieldEvent(List<ColumnValue> columnValues, String shard, String keyspace) {
return newFieldEvent(columnValues, shard, keyspace, false);
}

public static Binlogdata.VEvent newFieldEvent(List<ColumnValue> columnValues, String shard, String keyspace, boolean enumSetStringsValues) {
Binlogdata.FieldEvent.Builder fieldEventBuilder = Binlogdata.FieldEvent.newBuilder()
.setEnumSetStringValues(enumSetStringsValues)
.setTableName(getFullTableName(keyspace, TEST_TABLE));
for (Field field : newFields(columnValues)) {
fieldEventBuilder.addFields(field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.sql.Types;
import java.time.Instant;
import java.util.List;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -258,6 +260,50 @@ public void shouldProcessFieldEvent() throws Exception {
}
}

@Test
public void shouldProcessFieldEventWithEnumSetStringsFlagDisabledAndNoCopy() throws Exception {
// exercise SUT
List<TestHelper.ColumnValue> columnValues = List.of(new TestHelper.ColumnValue("enum", Query.Type.ENUM, Types.VARCHAR, "foo".getBytes(), "foo"));
Binlogdata.VEvent event = TestHelper.newFieldEvent(columnValues, TestHelper.TEST_SHARD, TestHelper.TEST_UNSHARDED_KEYSPACE, false);

decoder.processMessage(event, null, null, false, false);
Table table = schema.tableFor(TestHelper.defaultTableId());

// verify outcome
assertThat(table).isNotNull();
assertThat(table.id().schema()).isEqualTo(TestHelper.TEST_UNSHARDED_KEYSPACE);
assertThat(table.id().table()).isEqualTo(TestHelper.TEST_TABLE);
assertThat(table.columns().size()).isEqualTo(columnValues.size());
List<Integer> intTypes = List.of(Types.BIGINT, Types.INTEGER);
for (Query.Field field : event.getFieldEvent().getFieldsList()) {
assertThat(table.columnWithName(field.getName())).isNotNull();
String name = field.getName();
assertThat(intTypes.contains(table.columnWithName(field.getName()).jdbcType())).isTrue();
}
}

@Test
public void shouldProcessFieldEventWithEnumSetStringsFlagEnabledAndNoCopy() throws Exception {
// exercise SUT
List<TestHelper.ColumnValue> columnValues = List.of(
new TestHelper.ColumnValue("enum", Query.Type.ENUM, Types.VARCHAR, "foo".getBytes(), "foo", List.of("foo", "bar"), "enum"),
new TestHelper.ColumnValue("set", Query.Type.SET, Types.VARCHAR, "foo".getBytes(), "foo", List.of("foo", "bar"), "set"));
Binlogdata.VEvent event = TestHelper.newFieldEvent(columnValues, TestHelper.TEST_SHARD, TestHelper.TEST_UNSHARDED_KEYSPACE, true);

decoder.processMessage(event, null, null, false, false);
Table table = schema.tableFor(TestHelper.defaultTableId());

// verify outcome
assertThat(table).isNotNull();
assertThat(table.id().schema()).isEqualTo(TestHelper.TEST_UNSHARDED_KEYSPACE);
assertThat(table.id().table()).isEqualTo(TestHelper.TEST_TABLE);
assertThat(table.columns().size()).isEqualTo(columnValues.size());
for (Query.Field field : event.getFieldEvent().getFieldsList()) {
assertThat(table.columnWithName(field.getName())).isNotNull();
assertThat((table.columnWithName(field.getName()).jdbcType())).isEqualTo(Types.VARCHAR);
}
}

@Test
public void shouldHandleAddColumnPerShard() throws Exception {
String shard1 = "-80";
Expand Down

0 comments on commit 51c2aad

Please sign in to comment.