Skip to content

Commit

Permalink
DBZ-8325 Emit DDL events
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Nov 7, 2024
1 parent 17c8ff5 commit 8f8c30e
Show file tree
Hide file tree
Showing 18 changed files with 435 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.connector.vitess.connection.DdlMetadataExtractor;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
Expand All @@ -19,6 +21,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;

Expand Down Expand Up @@ -108,10 +111,36 @@ else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) {
}
return;
}
else if (message.getOperation() == ReplicationMessage.Operation.DDL || message.getOperation() == ReplicationMessage.Operation.OTHER) {
// DDL event or OTHER event
else if (message.getOperation() == ReplicationMessage.Operation.OTHER) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
}
else if (message.getOperation() == ReplicationMessage.Operation.DDL) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
offsetContext.setShard(message.getShard());

DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message);
TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable());
offsetContext.event(tableId, message.getCommitTime());
String ddlStatement = message.getStatement();
SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType();
SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of(
eventType,
partition,
offsetContext,
connectorConfig.getKeyspace(),
null,
ddlStatement,
null,
false);
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
catch (Exception e) {
throw new DebeziumException(e);
}
});
}
else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ public class DdlMessage implements ReplicationMessage {
private final String transactionId;
private final Instant commitTime;
private final Operation operation;
private final String statement;
private final String shard;
private final String keyspace;

public DdlMessage(String transactionId, Instant commitTime) {
public DdlMessage(String transactionId, Instant commitTime, String statement, String keyspace, String shard) {
this.transactionId = transactionId;
this.commitTime = commitTime;
this.operation = Operation.DDL;
this.statement = statement;
this.keyspace = keyspace;
this.shard = shard;
}

@Override
Expand All @@ -41,9 +47,19 @@ public String getTable() {
throw new UnsupportedOperationException();
}

@Override
public String getStatement() {
return statement;
}

@Override
public String getKeyspace() {
return keyspace;
}

@Override
public String getShard() {
throw new UnsupportedOperationException();
return shard;
}

@Override
Expand All @@ -67,8 +83,14 @@ public String toString() {
+ "transactionId='"
+ transactionId
+ '\''
+ ", keyspace="
+ keyspace
+ ", shard="
+ shard
+ ", commitTime="
+ commitTime
+ ", statement="
+ statement
+ ", operation="
+ operation
+ '}';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.connection;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;

/**
* @author Thomas Thornton
*/
public class DdlMetadataExtractor {

// VStream DDL statements do not contain any database/keyspace, only contains the table name
private static final Pattern TABLE_NAME_PATTERN = Pattern.compile(
"(?i)(CREATE|ALTER|TRUNCATE|DROP|RENAME)\\s+TABLE\\s+['\\\"`]?([\\w]+)['\\\"`]?",
Pattern.CASE_INSENSITIVE);

private final DdlMessage ddlMessage;
private String operation;
private String table;

public DdlMetadataExtractor(ReplicationMessage ddlMessage) {
this.ddlMessage = (DdlMessage) ddlMessage;
extractMetadata();
}

public void extractMetadata() {
Matcher matcher = TABLE_NAME_PATTERN.matcher(this.ddlMessage.getStatement());
if (matcher.find()) {
operation = matcher.group(1).split("\s+")[0].toUpperCase();
if (operation.equals("RENAME")) {
operation = "ALTER";
}
table = matcher.group(2);
}
}

public SchemaChangeEvent.SchemaChangeEventType getSchemaChangeEventType() {
return SchemaChangeEvent.SchemaChangeEventType.valueOf(operation);
}

public String getTable() {
return VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), ddlMessage.getKeyspace(), table).toDoubleQuotedString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public String getTransactionId() {
throw new UnsupportedOperationException();
}

@Override
public String getKeyspace() {
throw new UnsupportedOperationException();
}

@Override
public String getTable() {
throw new UnsupportedOperationException();
Expand All @@ -44,6 +49,11 @@ public String getShard() {
throw new UnsupportedOperationException();
}

@Override
public String getStatement() {
throw new UnsupportedOperationException();
}

@Override
public List<Column> getOldTupleList() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ public String getTransactionId() {
return transactionId;
}

@Override
public String getKeyspace() {
throw new UnsupportedOperationException();
}

@Override
public String getTable() {
throw new UnsupportedOperationException();
}

@Override
public String getStatement() {
throw new UnsupportedOperationException();
}

@Override
public String getShard() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ interface ColumnValue<T> {

String getTransactionId();

String getKeyspace();

String getTable();

String getShard();
Expand All @@ -80,6 +82,8 @@ interface ColumnValue<T> {

List<Column> getNewTupleList();

String getStatement();

default boolean isTransactionalMessage() {
return getOperation() == Operation.BEGIN || getOperation() == Operation.COMMIT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ public class TransactionalMessage implements ReplicationMessage {
private final String transactionId;
private final Instant commitTime;
private final Operation operation;
private final String keyspace;
private final String shard;

public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String shard) {
public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String keyspace, String shard) {
this.transactionId = transactionId;
this.commitTime = commitTime;
this.operation = operation;
this.keyspace = keyspace;
this.shard = shard;
}

Expand All @@ -38,6 +40,11 @@ public String getTransactionId() {
return transactionId;
}

@Override
public String getKeyspace() {
throw new UnsupportedOperationException();
}

@Override
public String getTable() {
throw new UnsupportedOperationException();
Expand All @@ -48,6 +55,11 @@ public String getShard() {
return shard;
}

@Override
public String getStatement() {
throw new UnsupportedOperationException();
}

@Override
public List<Column> getOldTupleList() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor pro
this.transactionId = newVgtid.toString();
}
processor.process(
new DdlMessage(transactionId, eventTimestamp), newVgtid, false);
new DdlMessage(transactionId, eventTimestamp, vEvent.getStatement(), vEvent.getKeyspace(), vEvent.getShard()), newVgtid, false);
}

private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
Expand Down Expand Up @@ -133,7 +133,7 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc
}
LOGGER.trace("Timestamp of begin transaction: {}", eventTimestamp);
processor.process(
new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false);
new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp, vEvent.getKeyspace(), vEvent.getShard()), newVgtid, false);
}

private void handleCommitMessage(
Expand All @@ -147,7 +147,7 @@ private void handleCommitMessage(
}
LOGGER.trace("Timestamp of commit transaction: {}", commitTimestamp);
processor.process(
new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false);
new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp, vEvent.getKeyspace(), vEvent.getShard()), newVgtid, false);
}

private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
Expand Down Expand Up @@ -216,6 +216,7 @@ private void decodeInsert(
Operation.INSERT,
commitTimestamp,
transactionId,
schemaName,
tableId.toDoubleQuotedString(),
shard,
null,
Expand Down Expand Up @@ -256,6 +257,7 @@ private void decodeUpdate(
Operation.UPDATE,
commitTimestamp,
transactionId,
schemaName,
tableId.toDoubleQuotedString(),
shard,
oldColumns,
Expand Down Expand Up @@ -294,6 +296,7 @@ private void decodeDelete(
Operation.DELETE,
commitTimestamp,
transactionId,
schemaName,
tableId.toDoubleQuotedString(),
shard,
columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class VStreamOutputReplicationMessage implements ReplicationMessage {
private final Operation op;
private final Instant commitTimestamp;
private final String transactionId;
private final String keyspace;
private final String table;
private final String shard;
private final List<Column> oldColumns;
Expand All @@ -26,13 +27,15 @@ public VStreamOutputReplicationMessage(
Operation op,
Instant commitTimestamp,
String transactionId,
String keyspace,
String table,
String shard,
List<Column> oldColumns,
List<Column> newColumns) {
this.op = op;
this.commitTimestamp = commitTimestamp;
this.transactionId = transactionId;
this.keyspace = keyspace;
this.table = table;
this.shard = shard;
this.oldColumns = oldColumns;
Expand All @@ -54,6 +57,11 @@ public String getTransactionId() {
return transactionId;
}

@Override
public String getKeyspace() {
return keyspace;
}

@Override
public String getTable() {
return table;
Expand All @@ -64,6 +72,11 @@ public String getShard() {
return shard;
}

@Override
public String getStatement() {
throw new UnsupportedOperationException();
}

@Override
public List<Column> getOldTupleList() {
return oldColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,8 @@ public static Vgtid defaultVgtid(VitessConnectorConfig config) {
if (config.offsetStoragePerTask()) {
List<String> shards = config.getVitessTaskKeyShards();
vgtid = config.getVitessTaskVgtid();
LOGGER.info("VGTID '{}' is set for the keyspace: {} shards: {}",
vgtid, config.getKeyspace(), shards);
LOGGER.info("VGTID is set for the keyspace: {}, shards: {}, vgtid: {}",
config.getKeyspace(), shards, vgtid);
}
else {
// If offset storage per task is disabled, then find the vgtid elsewhere
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/io/debezium/connector/vitess/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.debezium.connector.vitess.connection.ReplicationMessageColumn;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.vitess.proto.Query;
Expand Down Expand Up @@ -149,6 +150,8 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards,
.with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT)
.with(VitessConnectorConfig.VTGATE_USER, USERNAME)
.with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD)
// Only wait 5 seconds to stop, not default of 5 minutes
.with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, 5000)
.with(VitessConnectorConfig.POLL_INTERVAL_MS, 100);
if (!Strings.isNullOrEmpty(tableInclude)) {
builder.with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, tableInclude);
Expand Down
Loading

0 comments on commit 8f8c30e

Please sign in to comment.