diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java index 8870fa76..8f6b2a57 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java @@ -118,28 +118,31 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) { offsetContext.rotateVgtid(newVgtid, message.getCommitTime()); offsetContext.setShard(message.getShard()); - DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message); - TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable()); - offsetContext.event(tableId, message.getCommitTime()); - String ddlStatement = message.getStatement(); - SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType(); - SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of( - eventType, - partition, - offsetContext, - connectorConfig.getKeyspace(), - null, - ddlStatement, - null, - false); - dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> { - try { - receiver.schemaChangeEvent(schemaChangeEvent); - } - catch (Exception e) { - throw new DebeziumException(e); - } - }); + // DDLs events are only published if the schema change history is enabled, so we should skip parsing DDL events if it's disabled + if (connectorConfig.isSchemaChangesHistoryEnabled()) { + DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message); + TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable()); + offsetContext.event(tableId, message.getCommitTime()); + String ddlStatement = message.getStatement(); + SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType(); + SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of( + eventType, + partition, + offsetContext, + connectorConfig.getKeyspace(), + null, + ddlStatement, + null, + false); + dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> { + try { + receiver.schemaChangeEvent(schemaChangeEvent); + } + catch (Exception e) { + throw new DebeziumException(e); + } + }); + } } else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) { dispatcher.dispatchHeartbeatEvent(partition, offsetContext); diff --git a/src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java b/src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java index d5b7d90b..4ffd7848 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java +++ b/src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java @@ -9,6 +9,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.debezium.connector.vitess.VitessDatabaseSchema; import io.debezium.schema.SchemaChangeEvent; @@ -17,11 +20,18 @@ */ public class DdlMetadataExtractor { + private static final Logger LOGGER = LoggerFactory.getLogger(DdlMetadataExtractor.class); + // VStream DDL statements do not contain any database/keyspace, only contains the table name private static final Pattern TABLE_NAME_PATTERN = Pattern.compile( "(?i)(CREATE|ALTER|TRUNCATE|DROP|RENAME)\\s+TABLE\\s+['\\\"`]?([\\w]+)['\\\"`]?", Pattern.CASE_INSENSITIVE); + // Regex to match in-line or multi-line comments (e.g., /* comment */) + private static final Pattern COMMENT_PATTERN = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL); + + private static final String UNKNOWN_TABLE_NAME = ""; + private final DdlMessage ddlMessage; private String operation; private String table; @@ -32,7 +42,8 @@ public DdlMetadataExtractor(ReplicationMessage ddlMessage) { } public void extractMetadata() { - Matcher matcher = TABLE_NAME_PATTERN.matcher(this.ddlMessage.getStatement()); + String cleanedStatement = removeComments(this.ddlMessage.getStatement()); + Matcher matcher = TABLE_NAME_PATTERN.matcher(cleanedStatement); if (matcher.find()) { operation = matcher.group(1).split("\s+")[0].toUpperCase(); if (operation.equals("RENAME")) { @@ -42,11 +53,33 @@ public void extractMetadata() { } } + private String removeComments(String statement) { + return COMMENT_PATTERN.matcher(statement).replaceAll(""); + } + public SchemaChangeEvent.SchemaChangeEventType getSchemaChangeEventType() { + if (operation == null) { + logUnknownMessage("schema change event type"); + // An event type is required to build a schema change event, so if we got an empty event type, default to ALTER + return SchemaChangeEvent.SchemaChangeEventType.ALTER; + } return SchemaChangeEvent.SchemaChangeEventType.valueOf(operation); } public String getTable() { + if (table == null) { + logUnknownMessage("table"); + table = UNKNOWN_TABLE_NAME; + } return VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), ddlMessage.getKeyspace(), table).toDoubleQuotedString(); } + + private void logUnknownMessage(String message) { + LOGGER.warn("Unknown {}, keyspace: {}, shard: {}, commit time {}, transaction ID: {}", + message, + ddlMessage.getKeyspace(), + ddlMessage.getShard(), + ddlMessage.getCommitTime(), + ddlMessage.getTransactionId()); + } } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 65172381..79515539 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -47,7 +47,6 @@ import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1367,7 +1366,6 @@ public void shouldMultiShardConfigSubscriptionHaveMultiShardGtidsInVgtid() throw } @Test - @Ignore // TODO: enable the test once DBZ-8432 is fixed public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgtid() throws Exception { final boolean hasMultipleShards = true; @@ -1378,7 +1376,8 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti int expectedRecordsCount = 1; consumer = testConsumer(expectedRecordsCount); - assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, hasMultipleShards); + // Since there are two tasks and each gets one shard this is expected to only have one shard + assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, false); } @Test diff --git a/src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java b/src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java index eb1a0083..e33b107e 100644 --- a/src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java +++ b/src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java @@ -11,6 +11,7 @@ import org.junit.Test; import io.debezium.connector.vitess.TestHelper; +import io.debezium.junit.logging.LogInterceptor; import io.debezium.schema.SchemaChangeEvent; /** @@ -66,4 +67,26 @@ public void shouldGetRenameType() { assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER); } + @Test + public void shouldParseStatementWithComments() { + DdlMessage ddlMessage = new DdlMessage(null, null, "rename /* gh-ost */ table `keyspace`.`table1` to " + + "`keyspace`.`_table1_del`, `keyspace`.`_table_gho` to `keyspace`.`table`", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER); + } + + @Test + public void shouldGracefullyHandleUnparseableStatement() { + DdlMessage ddlMessage = new DdlMessage(null, null, "UNPARSEABLE command to a table", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + final LogInterceptor logInterceptor = new LogInterceptor(DdlMetadataExtractor.class); + String unknownTable = "Unknown table"; + String unknownType = "Unknown schema change event type"; + assertThat(extractor.getTable()).isEqualTo("\"0\".\"test_unsharded_keyspace\".\"\""); + assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER); + assertThat(logInterceptor.containsWarnMessage(unknownTable)).isTrue(); + assertThat(logInterceptor.containsWarnMessage(unknownType)).isTrue(); + } }