diff --git a/pom.xml b/pom.xml
index 64f32a59..afc87e8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -217,6 +217,19 @@
debezium-revapi
provided
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${version.debezium}
+
+
+ io.debezium
+ debezium-connector-binlog
+ ${version.debezium}
+
+
+
ch.qos.logback
diff --git a/src/main/java/io/debezium/connector/vitess/SourceInfo.java b/src/main/java/io/debezium/connector/vitess/SourceInfo.java
index 53e87513..75a793c1 100644
--- a/src/main/java/io/debezium/connector/vitess/SourceInfo.java
+++ b/src/main/java/io/debezium/connector/vitess/SourceInfo.java
@@ -114,4 +114,8 @@ public String toString() {
+ restartVgtid
+ '}';
}
+
+ public String table() {
+ return tableId == null ? null : tableId.table();
+ }
}
diff --git a/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java
index 3804997f..406abd79 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java
@@ -6,7 +6,8 @@
package io.debezium.connector.vitess;
import io.debezium.connector.vitess.connection.ReplicationConnection;
-import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
+import io.debezium.connector.vitess.jdbc.VitessConnection;
+import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
@@ -31,15 +32,18 @@ public class VitessChangeEventSourceFactory implements ChangeEventSourceFactory<
private final VitessDatabaseSchema schema;
private final ReplicationConnection replicationConnection;
private final SnapshotterService snapshotterService;
+ private final MainConnectionProvidingConnectionFactory connectionFactory;
public VitessChangeEventSourceFactory(
VitessConnectorConfig connectorConfig,
+ MainConnectionProvidingConnectionFactory connectionFactory,
ErrorHandler errorHandler,
EventDispatcher dispatcher,
Clock clock,
VitessDatabaseSchema schema,
ReplicationConnection replicationConnection, SnapshotterService snapshotterService) {
this.connectorConfig = connectorConfig;
+ this.connectionFactory = connectionFactory;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
@@ -54,11 +58,11 @@ public SnapshotChangeEventSource getSnapsh
// A dummy SnapshotChangeEventSource, snapshot is skipped.
return new VitessSnapshotChangeEventSource(
connectorConfig,
- new DefaultMainConnectionProvidingConnectionFactory<>(() -> null),
+ this.connectionFactory,
dispatcher,
schema,
clock,
- null,
+ snapshotProgressListener,
notificationService,
snapshotterService);
}
diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
index 6437cee9..a4163fe1 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
@@ -30,6 +30,7 @@
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.SourceInfoStructMaker;
+import io.debezium.connector.mysql.charset.MySqlCharsetRegistryServiceProvider;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
@@ -39,15 +40,20 @@
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.ColumnFilterMode;
+import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
+import io.debezium.util.Collect;
/**
* Vitess connector configuration, including its specific configurations and the common
* configurations from Debezium.
*/
-public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
+public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig {
public static final String CSV_DELIMITER = ",";
@@ -55,6 +61,30 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
private static final String VITESS_CONFIG_GROUP_PREFIX = "vitess.";
private static final int DEFAULT_VTGATE_PORT = 15_991;
+ private static final int DEFAULT_VTGATE_JDBC_PORT = 15_306;
+
+ /**
+ * Set of all built-in database names that will generally be ignored by the connector.
+ */
+ protected static final Set BUILT_IN_DB_NAMES = Collect.unmodifiableSet(
+ "mysql", "performance_schema", "sys", "information_schema");
+
+ @Override
+ public JdbcConfiguration getJdbcConfig() {
+ JdbcConfiguration jdbcConfiguration = super.getJdbcConfig();
+ JdbcConfiguration updatedConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit()
+ .with(JdbcConfiguration.DATABASE, getKeyspace())
+ .with(JdbcConfiguration.PORT, getVtgateJdbcPort())
+ .build());
+ return updatedConfig;
+ }
+
+ @Override
+ protected HistoryRecordComparator getHistoryRecordComparator() {
+ return new HistoryRecordComparator() {
+
+ };
+ }
/**
* The set of predefined SnapshotMode options or aliases.
@@ -66,6 +96,8 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL("initial"),
+ NO_DATA("no_data"),
+
/**
* Never perform an initial snapshot and only receive new data changes.
*/
@@ -203,6 +235,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate gRPC server.");
+ public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + "jdbc." + JdbcConfiguration.PORT)
+ .withDisplayName("Vitess JDBC database port")
+ .withType(Type.INT)
+ .withWidth(Width.SHORT)
+ .withDefault(DEFAULT_VTGATE_JDBC_PORT)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withValidation(Field::isInteger)
+ .withDescription("Port of the Vitess VTGate JDBC server.");
+
public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER)
.withDisplayName("User")
.withType(Type.STRING)
@@ -470,6 +511,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GTID,
VTGATE_HOST,
VTGATE_PORT,
+ VTGATE_JDBC_PORT,
VTGATE_USER,
VTGATE_PASSWORD,
TABLET_TYPE,
@@ -536,11 +578,16 @@ public static ConfigDef configDef() {
public VitessConnectorConfig(Configuration config) {
super(
+ VitessConnector.class,
config,
- null, x -> x.schema() + "." + x.table(),
+ Tables.TableFilter.fromPredicate(VitessConnectorConfig::isNotBuiltInTable),
+ x -> x.schema() + "." + x.table(),
+ true,
-1,
ColumnFilterMode.SCHEMA,
- true);
+ false);
+
+ getServiceRegistry().registerServiceProvider(new MySqlCharsetRegistryServiceProvider());
}
@Override
@@ -649,6 +696,10 @@ public int getVtgatePort() {
return getConfig().getInteger(VTGATE_PORT);
}
+ public int getVtgateJdbcPort() {
+ return getConfig().getInteger(VTGATE_JDBC_PORT);
+ }
+
public String getVtgateUsername() {
return getConfig().getString(VTGATE_USER);
}
@@ -754,6 +805,29 @@ public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, Schema
return new VitessHeartbeatImpl(getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(), getLogicalName(), schemaNameAdjuster);
}
+ /**
+ * Checks whether the {@link TableId} refers to a built-in table.
+ *
+ * @param tableId the relational table identifier, should not be null
+ * @return true if the reference refers to a built-in table
+ */
+ public static boolean isNotBuiltInTable(TableId tableId) {
+ return !isBuiltInDatabase(tableId.catalog());
+ }
+
+ /**
+ * Check whether the specified database name is a built-in database.
+ *
+ * @param databaseName the database name to check
+ * @return true if the database is a built-in database; false otherwise
+ */
+ public static boolean isBuiltInDatabase(String databaseName) {
+ if (databaseName == null) {
+ return false;
+ }
+ return BUILT_IN_DB_NAMES.contains(databaseName.toLowerCase());
+ }
+
public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() {
return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE),
BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString());
diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java
index 64609f91..d26db8ab 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java
@@ -23,7 +23,11 @@
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
+import io.debezium.connector.vitess.jdbc.VitessConnection;
import io.debezium.connector.vitess.metrics.VitessChangeEventSourceMetricsFactory;
+import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
@@ -37,6 +41,7 @@
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
+import io.debezium.snapshot.SnapshotterServiceProvider;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
@@ -121,13 +126,18 @@ protected ChangeEventSourceCoordinator sta
NotificationService notificationService = new NotificationService<>(getNotificationChannels(),
connectorConfig, SchemaFactory.get(), dispatcher::enqueueNotification);
+ JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();
+
+ MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
+ () -> new VitessConnection(jdbcConfig));
+
ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
VitessConnector.class,
connectorConfig,
new VitessChangeEventSourceFactory(
- connectorConfig, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService),
+ connectorConfig, connectionFactory, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService),
connectorConfig.offsetStoragePerTask() ? new VitessChangeEventSourceMetricsFactory() : new DefaultChangeEventSourceMetricsFactory<>(),
dispatcher,
schema,
@@ -225,6 +235,7 @@ protected Iterable getAllConfigurationFields() {
@Override
protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
+ serviceRegistry.registerServiceProvider(new SnapshotterServiceProvider());
serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
}
}
diff --git a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java
index 36c26130..e318e001 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java
@@ -6,15 +6,24 @@
package io.debezium.connector.vitess;
import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.debezium.relational.RelationalDatabaseSchema;
+import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
+import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
+import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
+import io.debezium.relational.ddl.DdlChanges;
+import io.debezium.relational.ddl.DdlParser;
+import io.debezium.relational.ddl.DdlParserListener;
+import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
@@ -22,9 +31,11 @@
* Logical in-memory representation of Vitess schema (a.k.a Vitess keyspace). It is used to create
* kafka connect {@link Schema} for all tables.
*/
-public class VitessDatabaseSchema extends RelationalDatabaseSchema {
+public class VitessDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(VitessDatabaseSchema.class);
+ private final DdlParser ddlParser;
+
public VitessDatabaseSchema(
VitessConnectorConfig config,
SchemaNameAdjuster schemaNameAdjuster,
@@ -50,6 +61,12 @@ public VitessDatabaseSchema(
false),
false,
config.getKeyMapper());
+ this.ddlParser = new MySqlAntlrDdlParser(
+ true,
+ false,
+ config.isSchemaCommentsHistoryEnabled(),
+ getTableFilter(),
+ config.getServiceRegistry().getService(BinlogCharsetRegistry.class));
}
/** Applies schema changes for the specified table. */
@@ -99,4 +116,133 @@ public static TableId parse(String table) {
public static TableId buildTableId(String shard, String keyspace, String table) {
return new TableId(shard, keyspace, table);
}
+
+ private String getDatabaseWithShard(String shard, String database) {
+ return String.format("%s.%s", shard, database);
+ }
+
+ public List parseDdl(VitessPartition partition, VitessOffsetContext offset, String ddlStatement,
+ String databaseName, String shard) {
+ final List schemaChangeEvents = new ArrayList<>(1);
+ DdlChanges ddlChanges = ddlParser.getDdlChanges();
+ ddlChanges.reset();
+ ddlParser.setCurrentDatabase(getDatabaseWithShard(shard, databaseName));
+ ddlParser.parse(ddlStatement, tables());
+ if (!ddlChanges.isEmpty()) {
+ ddlChanges.getEventsByDatabase((String dbName, List events) -> {
+ events.forEach(event -> {
+ final TableId tableId = getTableId(event);
+ SchemaChangeEvent.SchemaChangeEventType type = switch (event.type()) {
+ case CREATE_TABLE -> SchemaChangeEvent.SchemaChangeEventType.CREATE;
+ case DROP_TABLE -> SchemaChangeEvent.SchemaChangeEventType.DROP;
+ case ALTER_TABLE-> SchemaChangeEvent.SchemaChangeEventType.ALTER;
+ case TRUNCATE_TABLE -> SchemaChangeEvent.SchemaChangeEventType.TRUNCATE;
+ default -> {
+ LOGGER.info("Skipped DDL event type {}: {}", event.type(), ddlStatement);
+ yield null;
+ }
+ };
+ emitChangeEvent(
+ partition,
+ offset,
+ schemaChangeEvents,
+ databaseName,
+ event,
+ tableId,
+ type,
+ false);
+ });
+ });
+ return schemaChangeEvents;
+ }
+ return Collections.emptyList();
+ }
+
+ private TableId getTableId(DdlParserListener.Event event) {
+ if (event instanceof DdlParserListener.TableEvent) {
+ DdlParserListener.TableEvent tableEvent = (DdlParserListener.TableEvent) event;
+ return tableEvent.tableId();
+ }
+ return null;
+ }
+
+ private void emitChangeEvent(VitessPartition partition, VitessOffsetContext offset, List schemaChangeEvents,
+ String sanitizedDbName, DdlParserListener.Event event, TableId tableId,
+ SchemaChangeEvent.SchemaChangeEventType type, boolean snapshot) {
+ SchemaChangeEvent schemaChangeEvent;
+ if (type.equals(SchemaChangeEvent.SchemaChangeEventType.ALTER) && event instanceof DdlParserListener.TableAlteredEvent
+ && ((DdlParserListener.TableAlteredEvent) event).previousTableId() != null) {
+ schemaChangeEvent = SchemaChangeEvent.ofRename(
+ partition,
+ offset,
+ sanitizedDbName,
+ null,
+ event.statement(),
+ tableId != null ? tables().forTable(tableId) : null,
+ ((DdlParserListener.TableAlteredEvent) event).previousTableId());
+ }
+ else {
+ Table table = getTable(tableId, type);
+ schemaChangeEvent = SchemaChangeEvent.of(
+ type,
+ partition,
+ offset,
+ sanitizedDbName,
+ null,
+ event.statement(),
+ table,
+ snapshot);
+ }
+ schemaChangeEvents.add(schemaChangeEvent);
+ }
+
+ private Table getTable(TableId tableId, SchemaChangeEvent.SchemaChangeEventType type) {
+ if (tableId == null) {
+ return null;
+ }
+ if (SchemaChangeEvent.SchemaChangeEventType.DROP == type) {
+ // DROP events don't have information about tableChanges, so we are creating a Table object
+ // with just the tableId to be used during blocking snapshot to filter out drop events not
+ // related to table to be snapshotted.
+ return Table.editor().tableId(tableId).create();
+ }
+ return tables().forTable(tableId);
+ }
+
+ public DdlParser getDdlParser() {
+ return ddlParser;
+ }
+
+ @Override
+ public void applySchemaChange(SchemaChangeEvent schemaChange) {
+ switch (schemaChange.getType()) {
+ case CREATE:
+ case ALTER:
+ schemaChange.getTableChanges().forEach(x -> buildAndRegisterSchema(x.getTable()));
+ break;
+ case DROP:
+ schemaChange.getTableChanges().forEach(x -> removeSchema(x.getId()));
+ break;
+ default:
+ }
+
+ // Record the DDL statement so that we can later recover them.
+ // This is done _after_ writing the schema change records so that failure recovery (which is based on
+ // the schema history) won't lose schema change records.
+ //
+ // We either store:
+ // - all DDLs if configured
+ // - or global SEt variables
+ // - or DDLs for captured objects
+ if (!storeOnlyCapturedTables() || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase())
+ || schemaChange.getTables().stream().map(Table::id).anyMatch(getTableFilter()::isIncluded)) {
+ LOGGER.debug("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl());
+ record(schemaChange, schemaChange.getTableChanges());
+ }
+
+ }
+
+ public boolean isGlobalSetVariableStatement(String ddl, String databaseName) {
+ return (databaseName == null || databaseName.isEmpty()) && ddl != null && ddl.toUpperCase().startsWith("SET ");
+ }
}
diff --git a/src/main/java/io/debezium/connector/vitess/VitessMetadata.java b/src/main/java/io/debezium/connector/vitess/VitessMetadata.java
index b3175496..dba2a367 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessMetadata.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessMetadata.java
@@ -193,7 +193,7 @@ protected static List getNonEmptyShards(List> vitessTabletR
}
@VisibleForTesting
- protected static List flattenAndConcat(List> nestedList) {
+ public static List flattenAndConcat(List> nestedList) {
return nestedList.stream()
.map(innerList -> String.join("", innerList))
.collect(Collectors.toList());
diff --git a/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java b/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java
index 2aac4abc..d5feeb9a 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java
@@ -23,7 +23,7 @@
*/
public class VitessOffsetRetriever {
- private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnector.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(VitessOffsetRetriever.class);
private final int numTasks;
private final int gen;
diff --git a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java
index a238c387..f3f35e06 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java
@@ -5,31 +5,40 @@
*/
package io.debezium.connector.vitess;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import io.debezium.jdbc.JdbcConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.connector.vitess.connection.VitessReplicationConnection;
+import io.debezium.connector.vitess.jdbc.VitessConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
-import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;
+import io.vitess.proto.Vtgate;
/** Always skip snapshot for now */
public class VitessSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VitessSnapshotChangeEventSource.class);
+
+ private final VitessConnectorConfig connectorConfig;
+ private final VitessDatabaseSchema schema;
+
public VitessSnapshotChangeEventSource(
- RelationalDatabaseConnectorConfig connectorConfig,
- MainConnectionProvidingConnectionFactory connectionFactory,
+ VitessConnectorConfig connectorConfig,
+ MainConnectionProvidingConnectionFactory connectionFactory,
EventDispatcher dispatcher,
VitessDatabaseSchema schema,
Clock clock,
@@ -44,57 +53,132 @@ public VitessSnapshotChangeEventSource(
snapshotProgressListener,
notificationService,
snapshotterService);
+ this.connectorConfig = connectorConfig;
+ this.schema = schema;
}
@Override
protected Set getAllTableIds(RelationalSnapshotContext snapshotContext) {
- return null;
+ // TODO: Switch to using the connection factory
+ // TODO: Handle case of empty shard when that config exclude empty shards is enabled
+ List shards = new VitessMetadata(connectorConfig).getShards();
+ try (VitessReplicationConnection connection = new VitessReplicationConnection(connectorConfig, schema)) {
+ Vtgate.ExecuteResponse response = connection.executeInKeyspace("SHOW TABLES");
+ List> rows = VitessMetadata.parseRows(response.getResult().getRowsList());
+ List tables = VitessMetadata.flattenAndConcat(rows);
+ Set tableIds = new HashSet<>();
+ // TODO: Switch to querying all shards?
+ // Set the table ID for each shard, assume the DDLs are the same to avoid many queries on Vitess
+ for (String table : tables) {
+ for (String shard : shards) {
+ tableIds.add(new TableId(shard, connectorConfig.getKeyspace(), table));
+ }
+ }
+ return tableIds;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected void lockTablesForSchemaSnapshot(
ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext) {
+ // TODO: Do we need to do any locking?
+ LOGGER.info("lock tables");
}
@Override
- protected void determineSnapshotOffset(RelationalSnapshotContext snapshotContext, VitessOffsetContext offsetContext) {
+ protected void determineSnapshotOffset(RelationalSnapshotContext context, VitessOffsetContext previousOffset) {
+ LOGGER.info("determine offset");
+ context.offset = VitessOffsetContext.initialContext(connectorConfig, Clock.system());
}
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext snapshotContext,
VitessOffsetContext offsetContext, SnapshottingTask snapshottingTask) {
+ LOGGER.info("Reading table structure");
+ Set capturedSchemaTables = snapshotContext.capturedSchemaTables;
+ List shards = new VitessMetadata(connectorConfig).getShards();
+
+ for (TableId tableId : capturedSchemaTables) {
+ String sql = "SHOW CREATE TABLE " + quote(tableId);
+ try (VitessReplicationConnection connection = new VitessReplicationConnection(connectorConfig, schema)) {
+ Vtgate.ExecuteResponse response = connection.executeInKeyspace(sql);
+ List> rows = VitessMetadata.parseRows(response.getResult().getRowsList());
+ if (rows.size() == 0) {
+ LOGGER.error("No rows {}", response);
+ }
+ String ddlStatement = rows.get(0).get(1);
+ // TODO: Switch to getting the create table from each shard?
+ // For now, iterate over all shards and add the create tables to the table in-memory representation
+ for (String shard : shards) {
+ List schemaChangeEvents = schema.parseDdl(
+ snapshotContext.partition, snapshotContext.offset, ddlStatement, connectorConfig.getKeyspace(), shard);
+ for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
+ LOGGER.info("Adding schema change event {}", schemaChangeEvent);
+ Table table = schema.tableFor(tableId);
+ if (table != null) {
+ LOGGER.info("Adding schema for table {}", table.id());
+ Table updatedTable = getTableWithShardAsCatalog(table, shard);
+ snapshotContext.tables.overwriteTable(updatedTable);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private Table getTableWithShardAsCatalog(Table table, String shard) {
+ String[] shardAndDatabase = table.id().catalog().split("\\.");
+ String database = shardAndDatabase[1];
+ String tableName = table.id().table();
+ return table.edit().tableId(new TableId(shard, database, tableName)).create();
+ }
+
+ private String quote(TableId id) {
+ return quote(id.schema()) + "." + quote(id.table());
+ }
+
+ private String quote(String dbOrTableName) {
+ return "`" + dbOrTableName + "`";
}
@Override
protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext snapshotContext) {
+ LOGGER.info("release schema locks");
}
@Override
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext,
Table table) {
- return null;
+ return SchemaChangeEvent.ofSnapshotCreate(
+ snapshotContext.partition,
+ snapshotContext.offset,
+ snapshotContext.catalogName,
+ table);
}
@Override
protected Optional getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId, List columns) {
+ LOGGER.info("get snapshot select");
return Optional.empty();
}
- @Override
- public SnapshottingTask getSnapshottingTask(VitessPartition partition, VitessOffsetContext previousOffset) {
- boolean snapshotSchema = false;
- boolean snapshotData = false;
- return new SnapshottingTask(snapshotSchema, snapshotData, List.of(), Map.of(), false);
- }
-
@Override
protected SnapshotContext prepare(VitessPartition partition, boolean onDemand) {
+ LOGGER.info("snapshot context");
return new RelationalSnapshotContext<>(partition, "", onDemand);
}
@Override
protected VitessOffsetContext copyOffset(RelationalSnapshotContext snapshotContext) {
+ LOGGER.info("copy offset");
return null;
}
diff --git a/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java b/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java
index 94dce398..cda99947 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java
@@ -22,8 +22,8 @@ public void init(String connector, String version, CommonConnectorConfig connect
this.schema = commonSchemaBuilder()
.name("io.debezium.connector.vitess.Source")
.field(SourceInfo.KEYSPACE_NAME_KEY, Schema.STRING_SCHEMA)
- .field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA)
- .field(SourceInfo.SHARD_KEY, Schema.STRING_SCHEMA)
+ .field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
+ .field(SourceInfo.SHARD_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA)
.build();
}
@@ -37,7 +37,7 @@ public Schema schema() {
public Struct struct(SourceInfo sourceInfo) {
final Struct res = super.commonStruct(sourceInfo)
.put(SourceInfo.KEYSPACE_NAME_KEY, sourceInfo.keyspace())
- .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table())
+ .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.table())
.put(SourceInfo.SHARD_KEY, sourceInfo.shard())
.put(SourceInfo.VGTID_KEY, sourceInfo.getCurrentVgtid().toString());
return res;
diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java
index e7ba94c4..1a7da739 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java
@@ -5,12 +5,15 @@
*/
package io.debezium.connector.vitess;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.debezium.DebeziumException;
+import io.debezium.connector.vitess.connection.DdlMessage;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
@@ -19,6 +22,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
@@ -108,10 +112,29 @@ else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) {
}
return;
}
- else if (message.getOperation() == ReplicationMessage.Operation.DDL || message.getOperation() == ReplicationMessage.Operation.OTHER) {
- // DDL event or OTHER event
+ else if (message.getOperation() == ReplicationMessage.Operation.OTHER) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
}
+ else if (message.getOperation() == ReplicationMessage.Operation.DDL) {
+ offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
+ offsetContext.setShard(message.getShard());
+
+ DdlMessage ddlMessage = (DdlMessage) message;
+ List schemaChangeEvents = schema.parseDdl(
+ partition, offsetContext, ddlMessage.getStatement(),
+ connectorConfig.getKeyspace(), ddlMessage.getShard());
+ for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
+ final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id();
+ dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableId, (receiver) -> {
+ try {
+ receiver.schemaChangeEvent(schemaChangeEvent);
+ }
+ catch (Exception e) {
+ throw new DebeziumException(e);
+ }
+ });
+ }
+ }
else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
diff --git a/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java b/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java
index 293ce4f7..a8f384cd 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java
@@ -14,11 +14,15 @@ public class DdlMessage implements ReplicationMessage {
private final String transactionId;
private final Instant commitTime;
private final Operation operation;
+ private final String statement;
+ private final String shard;
- public DdlMessage(String transactionId, Instant commitTime) {
+ public DdlMessage(String transactionId, Instant commitTime, String statement, String shard) {
this.transactionId = transactionId;
this.commitTime = commitTime;
this.operation = Operation.DDL;
+ this.statement = statement;
+ this.shard = shard;
}
@Override
@@ -41,9 +45,14 @@ public String getTable() {
throw new UnsupportedOperationException();
}
+ @Override
+ public String getStatement() {
+ return statement;
+ }
+
@Override
public String getShard() {
- throw new UnsupportedOperationException();
+ return shard;
}
@Override
@@ -67,8 +76,12 @@ public String toString() {
+ "transactionId='"
+ transactionId
+ '\''
+ + ", shard="
+ + shard
+ ", commitTime="
+ commitTime
+ + ", statement="
+ + statement
+ ", operation="
+ operation
+ '}';
diff --git a/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java b/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java
index b3c8a4c7..bb63f31d 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java
@@ -44,6 +44,11 @@ public String getShard() {
throw new UnsupportedOperationException();
}
+ @Override
+ public String getStatement() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public List getOldTupleList() {
throw new UnsupportedOperationException();
diff --git a/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java b/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java
index 2b5179ea..8b47adf2 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java
@@ -41,6 +41,11 @@ public String getTable() {
throw new UnsupportedOperationException();
}
+ @Override
+ public String getStatement() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String getShard() {
throw new UnsupportedOperationException();
diff --git a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java
index e453d188..d900f174 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java
@@ -80,6 +80,8 @@ interface ColumnValue {
List getNewTupleList();
+ String getStatement();
+
default boolean isTransactionalMessage() {
return getOperation() == Operation.BEGIN || getOperation() == Operation.COMMIT;
}
diff --git a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java
index 76c38796..1124208e 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java
@@ -48,6 +48,11 @@ public String getShard() {
return shard;
}
+ @Override
+ public String getStatement() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public List getOldTupleList() {
throw new UnsupportedOperationException();
diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java
index 7bcf61f7..869f5368 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java
@@ -100,7 +100,7 @@ private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor pro
this.transactionId = newVgtid.toString();
}
processor.process(
- new DdlMessage(transactionId, eventTimestamp), newVgtid, false);
+ new DdlMessage(transactionId, eventTimestamp, vEvent.getStatement(), vEvent.getShard()), newVgtid, false);
}
private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java
index cb806417..ccbfcf29 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java
@@ -64,6 +64,11 @@ public String getShard() {
return shard;
}
+ @Override
+ public String getStatement() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public List getOldTupleList() {
return oldColumns;
diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java
index 9bffb4d9..ac7419e5 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java
@@ -77,6 +77,21 @@ public Vtgate.ExecuteResponse execute(String sqlStatement) {
return newBlockingStub(channel).execute(request);
}
+ public Vtgate.ExecuteResponse executeInKeyspace(String sqlStatement) {
+ LOGGER.info("Executing sqlStament {}", sqlStatement);
+ ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize());
+ managedChannel.compareAndSet(null, channel);
+
+ String target = String.format("%s", config.getKeyspace());
+ Vtgate.Session session = Vtgate.Session.newBuilder().setTargetString(target).setAutocommit(true).build();
+ LOGGER.debug("Autocommit {}", session.getAutocommit());
+ Vtgate.ExecuteRequest request = Vtgate.ExecuteRequest.newBuilder()
+ .setQuery(Proto.bindQuery(sqlStatement, Collections.emptyMap()))
+ .setSession(session)
+ .build();
+ return newBlockingStub(channel).execute(request);
+ }
+
public Vtgate.ExecuteResponse execute(String sqlStatement, String shard) {
LOGGER.info("Executing sqlStament {}", sqlStatement);
ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize());
diff --git a/src/main/java/io/debezium/connector/vitess/jdbc/VitessConnection.java b/src/main/java/io/debezium/connector/vitess/jdbc/VitessConnection.java
new file mode 100644
index 00000000..f73a1483
--- /dev/null
+++ b/src/main/java/io/debezium/connector/vitess/jdbc/VitessConnection.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.vitess.jdbc;
+
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+
+/**
+ * Needed to maintain compatibility with RelationalSnapshotChangeEventSource
+ *
+ * TODO: Move all query-based interactions with Vitess onto this class.
+ * Currently we do these with VitessReplicationConnection instead
+ *
+ * @author Thomas Thornton
+ */
+public class VitessConnection extends JdbcConnection {
+
+ private static final String URL = "jdbc:mysql://${hostname}:${port}/${dbname}?maxAllowedPacket=512000";
+
+ protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(URL);
+
+ private static final String QUOTED_CHARACTER = "`";
+
+ public VitessConnection(JdbcConfiguration config) {
+ super(config, resolveConnectionFactory(), QUOTED_CHARACTER, QUOTED_CHARACTER);
+ }
+
+ private static ConnectionFactory resolveConnectionFactory() {
+ return FACTORY;
+ }
+}
diff --git a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java
index 664a1b0b..86a94121 100644
--- a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java
+++ b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java
@@ -500,9 +500,7 @@ protected class TestConsumer {
protected TestConsumer(int expectedRecordsCount, String... topicPrefixes) {
this.expectedRecordsCount = expectedRecordsCount;
this.records = new ConcurrentLinkedQueue<>();
- this.topicPrefixes = Arrays.stream(topicPrefixes)
- .map(p -> TestHelper.TEST_SERVER + "." + p)
- .collect(Collectors.toList());
+ this.topicPrefixes = Arrays.stream(topicPrefixes).toList();
}
public void setIgnoreExtraRecords(boolean ignoreExtraRecords) {
diff --git a/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java b/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java
index 03e19af4..5b43af2a 100644
--- a/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java
+++ b/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java
@@ -128,8 +128,8 @@ public void schemaIsCorrect() {
.field("ts_us", Schema.OPTIONAL_INT64_SCHEMA)
.field("ts_ns", Schema.OPTIONAL_INT64_SCHEMA)
.field("keyspace", Schema.STRING_SCHEMA)
- .field("table", Schema.STRING_SCHEMA)
- .field("shard", Schema.STRING_SCHEMA)
+ .field("table", Schema.OPTIONAL_STRING_SCHEMA)
+ .field("shard", Schema.OPTIONAL_STRING_SCHEMA)
.field("vgtid", Schema.STRING_SCHEMA)
.build();
diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java
index ca0b77fa..43795946 100644
--- a/src/test/java/io/debezium/connector/vitess/TestHelper.java
+++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java
@@ -33,6 +33,7 @@
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.MemorySchemaHistory;
import io.vitess.proto.Query;
import io.vitess.proto.Query.Field;
@@ -87,6 +88,17 @@ public static Configuration.Builder defaultConfig() {
return defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TEST_SHARD, null, null);
}
+ public static String getTopicPrefix(boolean hasMultipleShards) {
+ String keyspace;
+ if (hasMultipleShards) {
+ keyspace = TEST_SHARDED_KEYSPACE;
+ }
+ else {
+ keyspace = TEST_UNSHARDED_KEYSPACE;
+ }
+ return String.join(".", TEST_SERVER, keyspace);
+ }
+
/**
* Get the default configuration of the connector
*
@@ -148,6 +160,7 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards,
.with(VitessConnectorConfig.VTGATE_HOST, VTGATE_HOST)
.with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT)
.with(VitessConnectorConfig.VTGATE_USER, USERNAME)
+ .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class)
.with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD)
.with(VitessConnectorConfig.POLL_INTERVAL_MS, 100);
if (!Strings.isNullOrEmpty(tableInclude)) {
diff --git a/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java b/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java
index b8420b2e..6d4b8278 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java
@@ -31,7 +31,7 @@
import io.debezium.util.Clock;
import io.vitess.proto.Query;
-public class VitessBigIntUnsignedTest {
+public class VitessBigIntUnsignedTest extends VitessTestCleanup {
private static final Logger LOGGER = LoggerFactory.getLogger(VitessChangeRecordEmitterTest.class);
protected static Object getJavaValue(VitessConnectorConfig.BigIntUnsignedHandlingMode mode) {
@@ -74,7 +74,6 @@ protected static Struct defaultStruct(TableSchema tableSchema, VitessConnectorCo
protected void handleInsert(VitessConnectorConfig.BigIntUnsignedHandlingMode mode) throws Exception {
VitessConnectorConfig connectorConfig;
- VitessDatabaseSchema schema;
VStreamOutputMessageDecoder decoder;
Configuration.Builder builder = TestHelper.defaultConfig();
diff --git a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java
index 89d49d19..1fdfccb3 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java
@@ -7,7 +7,7 @@
import static org.assertj.core.api.Assertions.assertThat;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,15 +22,14 @@
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
-public class VitessChangeRecordEmitterTest {
+public class VitessChangeRecordEmitterTest extends VitessTestCleanup {
private static final Logger LOGGER = LoggerFactory.getLogger(VitessChangeRecordEmitterTest.class);
- private static VitessConnectorConfig connectorConfig;
- private static VitessDatabaseSchema schema;
- private static VStreamOutputMessageDecoder decoder;
+ private VitessConnectorConfig connectorConfig;
+ private VStreamOutputMessageDecoder decoder;
- @BeforeClass
- public static void beforeClass() throws Exception {
+ @Before
+ public void before() throws Exception {
connectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().build());
schema = new VitessDatabaseSchema(
connectorConfig,
diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
index 33c21b83..1e0b68bd 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
@@ -16,6 +16,7 @@
import static io.debezium.connector.vitess.TestHelper.TEST_SHARD_TO_EPOCH;
import static io.debezium.connector.vitess.TestHelper.TEST_UNSHARDED_KEYSPACE;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE;
+import static io.debezium.connector.vitess.TestHelper.getTopicPrefix;
import static junit.framework.TestCase.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
@@ -73,6 +74,7 @@
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
@@ -180,6 +182,112 @@ public void shouldReceiveHeartbeatEvents() throws Exception {
assertThat(records.recordsForTopic(topic).size()).isEqualTo(expectedHeartbeatRecords);
}
+ @Test
+ @FixFor("DBZ-7962")
+ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception {
+ String keyspace = TEST_SHARDED_KEYSPACE;
+ String table = keyspace + ".ddl_table";
+ TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
+ startConnector(config -> config
+ .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true)
+ // .with(VitessConnectorConfig.KEYSPACE.name(), keyspace)
+ // .with(VitessConnectorConfig.SNAPSHOT_MODE_TABLES, "test_sharded_keyspace.ddl_table")
+ .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
+ .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
+ .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
+ true);
+ assertConnectorIsRunning();
+
+ String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);
+
+ TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);
+ TestHelper.execute("ALTER TABLE ddl_table ADD PARTITION (PARTITION p2 VALUES LESS THAN (2000));", TEST_SHARDED_KEYSPACE);
+ TestHelper.execute("TRUNCATE TABLE ddl_table;", TEST_SHARDED_KEYSPACE);
+ TestHelper.execute("DROP TABLE ddl_table;", TEST_SHARDED_KEYSPACE);
+ TestHelper.execute("CREATE TABLE ddl_table (id BIGINT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id));", TEST_SHARDED_KEYSPACE);
+
+ // 1 for the snapshot (create table ddls)
+ // 5 for the changes above
+ // 2 shards, so (5 + 1) * 2 = 12
+ int expectedSchemaChangeRecords = 12;
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> consumeRecordsByTopic(expectedSchemaChangeRecords).allRecordsInOrder().size() >= expectedSchemaChangeRecords);
+
+ AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(expectedSchemaChangeRecords, 1);
+ // assertThat(records.recordsForTopic(schemaChangeTopic).size()).isEqualTo(expectedSchemaChangeRecords);
+ }
+
+ @Test
+ @FixFor("DBZ-7962")
+ public void shouldReceiveSnapshotAndSchemaChangeEvents() throws Exception {
+ TestHelper.executeDDL("vitess_create_tables.ddl");
+ startConnector(config -> config
+ .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true)
+ .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.ddl_table")
+ // .with(VitessConnectorConfig.SNAPSHOT_MODE_TABLES "test_unsharded_keyspace.ddl_table")
+ .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
+ .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
+ false);
+ assertConnectorIsRunning();
+
+ String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);
+
+ TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;");
+ TestHelper.execute("ALTER TABLE ddl_table ADD PARTITION (PARTITION p2 VALUES LESS THAN (2000));");
+ TestHelper.execute("TRUNCATE TABLE ddl_table;");
+ TestHelper.execute("DROP TABLE ddl_table;");
+ TestHelper.execute("CREATE TABLE ddl_table (id BIGINT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id));");
+
+ // 1 is the snapshot
+ // 5 are the changes above
+ int expectedSchemaChangeRecords = 6;
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> consumeRecordsByTopic(expectedSchemaChangeRecords).allRecordsInOrder().size() >= expectedSchemaChangeRecords);
+
+ AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(expectedSchemaChangeRecords, 1);
+ // assertThat(records.recordsForTopic(schemaChangeTopic).size()).isEqualTo(expectedSchemaChangeRecords);
+ }
+
+ @Test
+ @FixFor("DBZ-7962")
+ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exception {
+ TestHelper.executeDDL("vitess_create_tables.ddl");
+ // startConnector();
+ startConnector(config -> config
+ .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true)
+ .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
+ false);
+ assertConnectorIsRunning();
+
+ String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);
+ String dataChangeTopic = String.join(".",
+ TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX),
+ TEST_UNSHARDED_KEYSPACE,
+ "ddl_table");
+
+ TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);");
+ TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;");
+
+ int expectedDataChangeRecords = 1;
+ int expectedSchemaChangeRecords = 1;
+ int expectedTotalRecords = expectedDataChangeRecords + expectedSchemaChangeRecords;
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> consumeRecordsByTopic(expectedTotalRecords).allRecordsInOrder().size() >= expectedTotalRecords);
+
+ AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(expectedTotalRecords, 1);
+ // assertThat(records.recordsForTopic(schemaChangeTopic).size()).isEqualTo(expectedSchemaChangeRecords);
+ // assertThat(records.recordsForTopic(dataChangeTopic).size()).isEqualTo(expectedDataChangeRecords);
+ }
+
@Test
@FixFor("DBZ-7962")
public void shouldReceiveHeartbeatEventsShardedKeyspace() throws Exception {
@@ -430,7 +538,7 @@ public void shouldSchemaUpdatedAfterOnlineDdl() throws Exception {
startConnector();
assertConnectorIsRunning();
int expectedRecordsCount = 1;
- consumer = testConsumer(expectedRecordsCount);
+ consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false));
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD);
// Add a column using online ddl and wait until it is finished
String ddlId = TestHelper.applyOnlineDdl("ALTER TABLE numeric_table ADD COLUMN foo INT", TEST_UNSHARDED_KEYSPACE);
@@ -686,7 +794,7 @@ public void shouldUseLocalVgtid() throws Exception {
Vgtid baseVgtid = TestHelper.getCurrentVgtid();
int expectedRecordsCount = 1;
- consumer = testConsumer(expectedRecordsCount + 2);
+ consumer = testConsumer(expectedRecordsCount + 2, getTopicPrefix(true), TEST_SERVER + ".transaction");
String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)";
String insertQuery = "INSERT INTO numeric_table ("
@@ -1223,7 +1331,7 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti
assertConnectorIsRunning();
int expectedRecordsCount = 1;
- consumer = testConsumer(expectedRecordsCount);
+ consumer = testConsumer(expectedRecordsCount, getTopicPrefix(hasMultipleShards));
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, hasMultipleShards);
}
@@ -1646,7 +1754,7 @@ public void testSnapshotForTableWithEnums() throws Exception {
}
// We should receive a record written before starting the connector.
- consumer = testConsumer(totalRecordsCount);
+ consumer = testConsumer(totalRecordsCount, getTopicPrefix(false));
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 1; i <= totalRecordsCount; i++) {
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_TYPE_STMT), TestHelper.PK_FIELD);
@@ -1691,7 +1799,7 @@ public void testSnapshotForTableWithEnumsAmbiguous() throws Exception {
}
// We should receive a record written before starting the connector.
- consumer = testConsumer(totalRecordsCount);
+ consumer = testConsumer(totalRecordsCount, getTopicPrefix(false));
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 1; i <= totalRecordsCount; i++) {
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_AMBIGUOUS_TYPE_STMT), TestHelper.PK_FIELD);
@@ -1730,7 +1838,7 @@ public void testVgtidIncludesLastPkDuringTableCopy() throws Exception {
-1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD);
// We should receive a record written before starting the connector.
- consumer = testConsumer(expectedSnapshotRecordsCount);
+ consumer = testConsumer(expectedSnapshotRecordsCount, getTopicPrefix(false));
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 1; i <= expectedSnapshotRecordsCount; i++) {
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD);
@@ -1790,11 +1898,12 @@ public void testMidSnapshotRecoveryLargeTable() throws Exception {
startConnector(Function.identity(), false, false, 1,
-1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD);
- consumer = testConsumer(1, tableInclude);
+ String topicPrefix = TEST_SERVER + "." + tableInclude;
+ consumer = testConsumer(1, topicPrefix, getTopicPrefix(false));
consumer.await(TestHelper.waitTimeForRecords(), 0, TimeUnit.SECONDS);
stopConnector();
// Upper bound is the total size of the table so set that to prevent early termination
- consumer = testConsumer(expectedSnapshotRecordsCount, tableInclude);
+ consumer = testConsumer(expectedSnapshotRecordsCount, topicPrefix);
int recordCount = consumer.countRecords(5, TimeUnit.SECONDS);
// Assert snapshot is partially complete
assertThat(recordCount).isPositive();
@@ -1855,7 +1964,7 @@ public void testCopyNoRecordsAndReplicateTable() throws Exception {
startConnector(Function.identity(), false, false, 1, -1, -1, tableInclude, null, null);
int expectedRecordsCount = 1;
- consumer = testConsumer(expectedRecordsCount);
+ consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false));
// We should receive record from numeric_table
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD);
@@ -1873,7 +1982,7 @@ public void testInitialSnapshotModeHaveMultiShard() throws Exception {
// We should receive a record written before starting the connector.
int expectedRecordsCount = 1;
- consumer = testConsumer(expectedRecordsCount);
+ consumer = testConsumer(expectedRecordsCount, getTopicPrefix(hasMultipleShards));
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT, TEST_SHARDED_KEYSPACE), TestHelper.PK_FIELD);
assertSourceInfo(record, TEST_SERVER, TEST_SHARDED_KEYSPACE, "numeric_table");
@@ -1896,7 +2005,7 @@ public void testCopyTableAndRestart() throws Exception {
// We should receive a record written before starting the connector.
int expectedRecordsCount = 1;
- consumer = testConsumer(expectedRecordsCount);
+ consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false));
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD);
assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table");
@@ -1920,7 +2029,7 @@ public void testCopyAndReplicatePerTaskOffsetStorage() throws Exception {
// We should receive a record written before starting the connector.
int expectedRecordsCount = 1;
- consumer = testConsumer(expectedRecordsCount);
+ consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false));
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD);
assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table");
diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java
index 658f6282..7e4b6bae 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java
@@ -32,7 +32,7 @@
import io.debezium.util.Collect;
import io.debezium.util.Testing;
-public class VitessConnectorTaskTest {
+public class VitessConnectorTaskTest extends VitessTestCleanup {
private static final LogInterceptor logInterceptor = new LogInterceptor(BaseSourceTask.class);
private static final LogInterceptor vitessLogInterceptor = new LogInterceptor(VitessConnectorTask.class);
@@ -45,7 +45,7 @@ public void shouldStartWithTaskOffsetStorageEnabledAndNoOffsets() {
.with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1)
.with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "0")
.build();
- VitessConnectorTask task = new VitessConnectorTask();
+ task = new VitessConnectorTask();
ContextHelper helper = new ContextHelper();
task.initialize(helper.getSourceTaskContext());
ChangeEventSourceCoordinator coordinator = task.start(config);
@@ -58,7 +58,7 @@ public void shouldStartWithTaskOffsetStorageDisabledAndNoOffsets() {
.with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1)
.with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "0")
.build();
- VitessConnectorTask task = new VitessConnectorTask();
+ task = new VitessConnectorTask();
ContextHelper helper = new ContextHelper();
task.initialize(helper.getSourceTaskContext());
ChangeEventSourceCoordinator coordinator = task.start(config);
@@ -71,7 +71,7 @@ public void shouldReadOffsetsWhenTaskOffsetStorageDisabled() {
.with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1)
.with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "0")
.build();
- VitessConnectorTask task = new VitessConnectorTask();
+ task = new VitessConnectorTask();
ContextHelper helper = new ContextHelper();
helper.storeOffsets(VGTID_JSON, null);
task.initialize(helper.getSourceTaskContext());
@@ -98,7 +98,7 @@ public void shouldReadCurrentGenOffsets() {
.with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1)
.with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "-80,80-")
.build();
- VitessConnectorTask task = new VitessConnectorTask();
+ task = new VitessConnectorTask();
ContextHelper helper = new ContextHelper();
helper.storeOffsets(null, Map.of(taskKey, VGTID_JSON));
task.initialize(helper.getSourceTaskContext());
@@ -128,7 +128,7 @@ public void shouldReadPreviousGenOffsets() {
.with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, shards)
.with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 2)
.build();
- VitessConnectorTask task = new VitessConnectorTask();
+ task = new VitessConnectorTask();
task.initialize(helper.getSourceTaskContext());
task.start(config);
String expectedMessage = "Using offsets from previous gen";
@@ -155,7 +155,7 @@ public void shouldReadConfiguredOffsets() {
.with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1)
.with(VitessConnectorConfig.VGTID, VGTID_JSON)
.build();
- VitessConnectorTask task = new VitessConnectorTask();
+ task = new VitessConnectorTask();
ContextHelper helper = new ContextHelper();
task.initialize(helper.getSourceTaskContext());
task.start(config);
diff --git a/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java b/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java
index 596a798f..34e5decb 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java
@@ -41,9 +41,9 @@ public void shouldGetCorrectSourceInfoSchema() {
assertThat(structMaker.schema().field(SourceInfo.KEYSPACE_NAME_KEY).schema())
.isEqualTo(Schema.STRING_SCHEMA);
assertThat(structMaker.schema().field(SourceInfo.SHARD_KEY).schema())
- .isEqualTo(Schema.STRING_SCHEMA);
+ .isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(structMaker.schema().field(SourceInfo.TABLE_NAME_KEY).schema())
- .isEqualTo(Schema.STRING_SCHEMA);
+ .isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(structMaker.schema().field(SourceInfo.VGTID_KEY).schema())
.isEqualTo(Schema.STRING_SCHEMA);
assertThat(structMaker.schema()).isNotNull();
diff --git a/src/test/java/io/debezium/connector/vitess/VitessTestCleanup.java b/src/test/java/io/debezium/connector/vitess/VitessTestCleanup.java
new file mode 100644
index 00000000..ff12bc96
--- /dev/null
+++ b/src/test/java/io/debezium/connector/vitess/VitessTestCleanup.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.vitess;
+
+import org.junit.After;
+
+/**
+ * @author Thomas Thornton
+ */
+public class VitessTestCleanup {
+
+ public VitessDatabaseSchema schema;
+ public VitessConnectorTask task;
+
+ @After
+ public void afterEach() {
+ if (schema != null) {
+ try {
+ schema.close();
+ }
+ finally {
+ schema = null;
+ }
+ }
+ if (task != null) {
+ try {
+ task.doStop();
+ }
+ finally {
+ task = null;
+ }
+ }
+ }
+
+}
diff --git a/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java b/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java
index 308cdce2..954c92e4 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java
@@ -36,9 +36,8 @@
import binlogdata.Binlogdata;
-public class VitessValueConverterTest {
+public class VitessValueConverterTest extends VitessTestCleanup {
- private VitessDatabaseSchema schema;
private VitessConnectorConfig config;
private VitessValueConverter converter;
private VStreamOutputMessageDecoder decoder;
diff --git a/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java b/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java
new file mode 100644
index 00000000..86a05180
--- /dev/null
+++ b/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.vitess.connection;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+
+import org.junit.Test;
+
+/**
+ * @author Thomas Thornton
+ */
+public class DdlMessageTest {
+
+ @Test
+ public void shouldSetQuery() {
+ String statement = "ALTER TABLE foo RENAME TO bar";
+ ReplicationMessage replicationMessage = new DdlMessage("gtid", Instant.EPOCH, statement, "0");
+ assertThat(replicationMessage.getStatement()).isEqualTo(statement);
+ }
+
+ @Test
+ public void shouldSetShard() {
+ String statement = "ALTER TABLE foo RENAME TO bar";
+ String shard = "-80";
+ ReplicationMessage replicationMessage = new DdlMessage("gtid", Instant.EPOCH, statement, shard);
+ assertThat(replicationMessage.getShard()).isEqualTo(shard);
+ }
+
+ @Test
+ public void shouldConvertToString() {
+ String statement = "ALTER TABLE foo RENAME TO bar";
+ String shard = "-80";
+ ReplicationMessage replicationMessage = new DdlMessage("gtid", Instant.EPOCH, statement, shard);
+ assertThat(replicationMessage.toString()).isEqualTo(
+ "DdlMessage{transactionId='gtid', shard=-80, commitTime=1970-01-01T00:00:00Z, statement=ALTER TABLE foo RENAME TO bar, operation=DDL}");
+ }
+
+}
diff --git a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java
index 98cf3e7b..4301afb8 100644
--- a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java
+++ b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java
@@ -21,6 +21,7 @@
import io.debezium.connector.vitess.VgtidTest;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessDatabaseSchema;
+import io.debezium.connector.vitess.VitessTestCleanup;
import io.debezium.doc.FixFor;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@@ -31,11 +32,10 @@
import binlogdata.Binlogdata;
-public class VStreamOutputMessageDecoderTest {
+public class VStreamOutputMessageDecoderTest extends VitessTestCleanup {
private static final Logger LOGGER = LoggerFactory.getLogger(VStreamOutputMessageDecoderTest.class);
private VitessConnectorConfig connectorConfig;
- private VitessDatabaseSchema schema;
private VStreamOutputMessageDecoder decoder;
@Before
diff --git a/src/test/resources/vitess_create_tables.ddl b/src/test/resources/vitess_create_tables.ddl
index c79212e4..7411f043 100644
--- a/src/test/resources/vitess_create_tables.ddl
+++ b/src/test/resources/vitess_create_tables.ddl
@@ -20,6 +20,16 @@ CREATE TABLE numeric_table
PRIMARY KEY (id)
);
+DROP TABLE IF EXISTS ddl_table;
+CREATE TABLE ddl_table
+(
+ id BIGINT NOT NULL AUTO_INCREMENT,
+ PRIMARY KEY (id)
+)
+PARTITION BY RANGE (id) (
+ PARTITION p0 VALUES LESS THAN (1000)
+);
+
DROP TABLE IF EXISTS string_table;
CREATE TABLE string_table
(