Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7293 Fallback to old schema in case of race condition between ROW and FIELD #175

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
* Logical in-memory representation of Vitess schema (a.k.a Vitess keyspace). It is used to create
* kafka connect {@link Schema} for all tables.
*/
public class VitessDatabaseSchema extends RelationalDatabaseSchema {
public class VitessDatabaseSchema extends RelationalDatabaseSchema implements Cloneable {
private static final Logger LOGGER = LoggerFactory.getLogger(VitessDatabaseSchema.class);

private final VitessConnectorConfig config;
private final SchemaNameAdjuster schemaNameAdjuster;
private final TopicNamingStrategy<TableId> topicNamingStrategy;

public VitessDatabaseSchema(
VitessConnectorConfig config,
SchemaNameAdjuster schemaNameAdjuster,
Expand All @@ -49,6 +53,9 @@ public VitessDatabaseSchema(
false),
false,
config.getKeyMapper());
this.config = config;
this.schemaNameAdjuster = schemaNameAdjuster;
this.topicNamingStrategy = topicNamingStrategy;
}

/** Applies schema changes for the specified table. */
Expand Down Expand Up @@ -98,4 +105,9 @@ public static TableId parse(String table) {
public static TableId buildTableId(String shard, String keyspace, String table) {
return new TableId(shard, keyspace, table);
}

@Override
public VitessDatabaseSchema clone() {
return new VitessDatabaseSchema(this.config, this.schemaNameAdjuster, this.topicNamingStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package io.debezium.connector.vitess.connection;

import static io.debezium.connector.vitess.connection.ReplicationMessage.Column;

import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -47,8 +45,13 @@ public class VStreamOutputMessageDecoder implements MessageDecoder {

private final VitessDatabaseSchema schema;

private final VitessDatabaseSchema schemaBackup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final VitessDatabaseSchema schemaBackup;
private final VitessDatabaseSchema delayedSchema;


public VStreamOutputMessageDecoder(VitessDatabaseSchema schema) {
this.schema = schema;
// Schema can be null. See: VitessConnector.validateConnection
if (schema != null) this.schemaBackup = schema.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no cloning, both schema and delayedSchema should be passed from the constructor so it is apparent outside that two instances are needed.

else this.schemaBackup = null;
}

@Override
Expand Down Expand Up @@ -293,16 +296,32 @@ private Optional<Table> resolveRelation(String shard, String schemaName, String
return Optional.ofNullable(schema.tableFor(VitessDatabaseSchema.buildTableId(shard, schemaName, tableName)));
}

private Optional<Table> resolveRelationFromBackup(TableId tableId) {
return Optional.ofNullable(schemaBackup.tableFor(tableId));
}

/** Resolve the vEvent data to a list of replication message columns (with values). */
private List<Column> resolveColumns(Row row, Table table) {
return resolveColumns(row, table, false);
}

/**
* Resolve the vEvent data to a list of replication message columns (with values).
* NOTE: Sometimes due to race condition in Vitess ROW and TYPE messages can be mixed up, so there's a fallback
* the old schema version in case row length mismatches between Row and Table in schema cache.
* <a href="https://vitess.io/docs/18.0/reference/vreplication/internal/tracker/#caveat">...</a>
*/
private List<Column> resolveColumns(Row row, Table table, Boolean fromBackup) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private List<Column> resolveColumns(Row row, Table table, Boolean fromBackup) {
private List<Column> resolveColumns(Row row, Table table, boolean fromBackup) {

int numberOfColumns = row.getLengthsCount();
List<io.debezium.relational.Column> tableColumns = table.columns();
if (tableColumns.size() != numberOfColumns) {
throw new IllegalStateException(
TableId tableId = table.id();
Optional<Table> backup = fromBackup ? Optional.empty() : resolveRelationFromBackup(tableId);
return backup.map(backupTable -> resolveColumns(row, backupTable, true)).orElseThrow(() -> new IllegalStateException(
String.format(
"The number of columns in the ROW event %s is different from the in-memory table schema %s.",
row,
table));
table)));
}

ByteString rawValues = row.getValues();
Expand Down Expand Up @@ -371,8 +390,12 @@ else if ((field.getFlags() & UNIQUE_KEY_FLAG) != 0) {
Table table = resolveTable(shard, schemaName, tableName, columns);
LOGGER.debug("Number of columns in the resolved table: {}", table.columns().size());

if (schema.tableIds().contains(table.id())) {
Table tableToBackup = schema.tableFor(table.id());
LOGGER.info("Old schema for table: {} found in cache, backing up", table.id());
schemaBackup.applySchemaChangesForTable(tableToBackup);
}
schema.applySchemaChangesForTable(table);
return;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.vitess.AnonymousValue;
import io.debezium.connector.vitess.TestHelper;
Expand All @@ -30,7 +28,6 @@
import binlogdata.Binlogdata;

public class VStreamOutputMessageDecoderTest {
private static final Logger LOGGER = LoggerFactory.getLogger(VStreamOutputMessageDecoderTest.class);

private VitessConnectorConfig connectorConfig;
private VitessDatabaseSchema schema;
Expand Down Expand Up @@ -402,6 +399,65 @@ public void shouldThrowExceptionWithDetailedMessageOnRowSchemaMismatch() throws
.hasMessageContaining("long_col");
}

@Test
public void shouldProcessOutOfOrderTypeAndRowMessages() throws Exception {

// Create schema for default fields
decoder.processMessage(TestHelper.defaultFieldEvent(), null, null, false);
// verify outcome
Table table = schema.tableFor(TestHelper.defaultTableId());
assertThat(table).isNotNull();
assertThat(table.id().schema()).isEqualTo(TestHelper.TEST_UNSHARDED_KEYSPACE);
assertThat(table.id().table()).isEqualTo(TestHelper.TEST_TABLE);
assertThat(table.columns().size()).isEqualTo(TestHelper.defaultNumOfColumns());
for (Query.Field field : TestHelper.defaultFields()) {
assertThat(table.columnWithName(field.getName())).isNotNull();
}

// Fields update with a subset of fields
decoder.processMessage(TestHelper.newFieldEvent(TestHelper.columnValuesSubset()), null, null, false);
// verify outcome
Table tableUpdated = schema.tableFor(TestHelper.defaultTableId());
assertThat(tableUpdated).isNotNull();
assertThat(tableUpdated.id().schema()).isEqualTo(TestHelper.TEST_UNSHARDED_KEYSPACE);
assertThat(tableUpdated.id().table()).isEqualTo(TestHelper.TEST_TABLE);
assertThat(tableUpdated.columns().size()).isEqualTo(TestHelper.columnSubsetNumOfColumns());
for (Query.Field field : TestHelper.fieldsSubset()) {
assertThat(tableUpdated.columnWithName(field.getName())).isNotNull();
}

// Row event with old default fields
final boolean[] processed = { false };
decoder.processMessage(
TestHelper.defaultInsertEvent(),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message).isNotNull();
assertThat(message).isInstanceOf(VStreamOutputReplicationMessage.class);
assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.INSERT);
assertThat(message.getOldTupleList()).isNull();
assertThat(message.getShard()).isEqualTo(TestHelper.TEST_SHARD);
assertThat(message.getNewTupleList().size()).isEqualTo(TestHelper.defaultNumOfColumns());
processed[0] = true;
},
null, false);

// Row event with new fields
decoder.processMessage(
TestHelper.insertEvent(TestHelper.columnValuesSubset()),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message).isNotNull();
assertThat(message).isInstanceOf(VStreamOutputReplicationMessage.class);
assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.INSERT);
assertThat(message.getOldTupleList()).isNull();
assertThat(message.getShard()).isEqualTo(TestHelper.TEST_SHARD);
assertThat(message.getNewTupleList().size()).isEqualTo(TestHelper.columnSubsetNumOfColumns());
processed[0] = true;
},
null, false);
}

@Test
public void shouldProcessInsertEvent() throws Exception {
// setup fixture
Expand Down
Loading