Skip to content

Commit

Permalink
DBZ-8479 Improve DDL parsing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Dec 2, 2024
1 parent 40251d0 commit 1c8ee77
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,31 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
offsetContext.setShard(message.getShard());

DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message);
TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable());
offsetContext.event(tableId, message.getCommitTime());
String ddlStatement = message.getStatement();
SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType();
SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of(
eventType,
partition,
offsetContext,
connectorConfig.getKeyspace(),
null,
ddlStatement,
null,
false);
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
catch (Exception e) {
throw new DebeziumException(e);
}
});
// DDLs events are only published if the schema change history is enabled, so we should skip parsing DDL events if it's disabled
if (connectorConfig.isSchemaChangesHistoryEnabled()) {
DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message);
TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable());
offsetContext.event(tableId, message.getCommitTime());
String ddlStatement = message.getStatement();
SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType();
SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of(
eventType,
partition,
offsetContext,
connectorConfig.getKeyspace(),
null,
ddlStatement,
null,
false);
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
catch (Exception e) {
throw new DebeziumException(e);
}
});
}
}
else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;

Expand All @@ -17,11 +20,18 @@
*/
public class DdlMetadataExtractor {

private static final Logger LOGGER = LoggerFactory.getLogger(DdlMetadataExtractor.class);

// VStream DDL statements do not contain any database/keyspace, only contains the table name
private static final Pattern TABLE_NAME_PATTERN = Pattern.compile(
"(?i)(CREATE|ALTER|TRUNCATE|DROP|RENAME)\\s+TABLE\\s+['\\\"`]?([\\w]+)['\\\"`]?",
Pattern.CASE_INSENSITIVE);

// Regex to match in-line or multi-line comments (e.g., /* comment */)
private static final Pattern COMMENT_PATTERN = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);

private static final String UNKNOWN_TABLE_NAME = "<UNKNOWN>";

private final DdlMessage ddlMessage;
private String operation;
private String table;
Expand All @@ -32,7 +42,8 @@ public DdlMetadataExtractor(ReplicationMessage ddlMessage) {
}

public void extractMetadata() {
Matcher matcher = TABLE_NAME_PATTERN.matcher(this.ddlMessage.getStatement());
String cleanedStatement = removeComments(this.ddlMessage.getStatement());
Matcher matcher = TABLE_NAME_PATTERN.matcher(cleanedStatement);
if (matcher.find()) {
operation = matcher.group(1).split("\s+")[0].toUpperCase();
if (operation.equals("RENAME")) {
Expand All @@ -42,11 +53,33 @@ public void extractMetadata() {
}
}

private String removeComments(String statement) {
return COMMENT_PATTERN.matcher(statement).replaceAll("");
}

public SchemaChangeEvent.SchemaChangeEventType getSchemaChangeEventType() {
if (operation == null) {
logUnknownMessage("schema change event type");
// An event type is required to build a schema change event, so if we got an empty event type, default to ALTER
return SchemaChangeEvent.SchemaChangeEventType.ALTER;
}
return SchemaChangeEvent.SchemaChangeEventType.valueOf(operation);
}

public String getTable() {
if (table == null) {
logUnknownMessage("table");
table = UNKNOWN_TABLE_NAME;
}
return VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), ddlMessage.getKeyspace(), table).toDoubleQuotedString();
}

private void logUnknownMessage(String message) {
LOGGER.warn("Unknown {}, keyspace: {}, shard: {}, commit time {}, transaction ID: {}",
message,
ddlMessage.getKeyspace(),
ddlMessage.getShard(),
ddlMessage.getCommitTime(),
ddlMessage.getTransactionId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1367,7 +1366,6 @@ public void shouldMultiShardConfigSubscriptionHaveMultiShardGtidsInVgtid() throw
}

@Test
@Ignore // TODO: enable the test once DBZ-8432 is fixed
public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgtid() throws Exception {
final boolean hasMultipleShards = true;

Expand All @@ -1378,7 +1376,8 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, hasMultipleShards);
// Since there are two tasks and each gets one shard this is expected to only have one shard
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.Test;

import io.debezium.connector.vitess.TestHelper;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.schema.SchemaChangeEvent;

/**
Expand Down Expand Up @@ -66,4 +67,26 @@ public void shouldGetRenameType() {
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER);
}

@Test
public void shouldParseStatementWithComments() {
DdlMessage ddlMessage = new DdlMessage(null, null, "rename /* gh-ost */ table `keyspace`.`table1` to " +
"`keyspace`.`_table1_del`, `keyspace`.`_table_gho` to `keyspace`.`table`",
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER);
}

@Test
public void shouldGracefullyHandleUnparseableStatement() {
DdlMessage ddlMessage = new DdlMessage(null, null, "UNPARSEABLE command to a table",
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
final LogInterceptor logInterceptor = new LogInterceptor(DdlMetadataExtractor.class);
String unknownTable = "Unknown table";
String unknownType = "Unknown schema change event type";
assertThat(extractor.getTable()).isEqualTo("\"0\".\"test_unsharded_keyspace\".\"<UNKNOWN>\"");
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER);
assertThat(logInterceptor.containsWarnMessage(unknownTable)).isTrue();
assertThat(logInterceptor.containsWarnMessage(unknownType)).isTrue();
}
}

0 comments on commit 1c8ee77

Please sign in to comment.