Skip to content

Commit

Permalink
DBZ-8325 Minor refactor and testing improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 17, 2024
1 parent 2ed84c3 commit 3f34946
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 70 deletions.
4 changes: 0 additions & 4 deletions src/main/java/io/debezium/connector/vitess/SourceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,4 @@ public String toString() {
+ restartVgtid
+ '}';
}

public String table() {
return tableId == null ? null : tableId.table();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.vitess;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -15,6 +16,7 @@

import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.vitess.connection.DdlMessage;
import io.debezium.connector.vitess.jdbc.VitessBinlogValueConverter;
import io.debezium.connector.vitess.jdbc.VitessDefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
Expand Down Expand Up @@ -118,9 +120,11 @@ private String getDatabaseWithShard(String shard, String database) {
return String.format("%s.%s", shard, database);
}

public List<SchemaChangeEvent> parseDdl(VitessPartition partition, VitessOffsetContext offset, String ddlStatement,
String databaseName, String shard) {
public List<SchemaChangeEvent> parseDdl(VitessPartition partition, VitessOffsetContext offset, DdlMessage ddlMessage, String databaseName) {
final List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>(1);
String ddlStatement = ddlMessage.getStatement();
String shard = ddlMessage.getShard();
Instant timestsamp = ddlMessage.getCommitTime();
DdlChanges ddlChanges = ddlParser.getDdlChanges();
ddlChanges.reset();
ddlParser.setCurrentDatabase(getDatabaseWithShard(shard, databaseName));
Expand All @@ -129,6 +133,7 @@ public List<SchemaChangeEvent> parseDdl(VitessPartition partition, VitessOffsetC
ddlChanges.getEventsByDatabase((String dbName, List<DdlParserListener.Event> events) -> {
events.forEach(event -> {
final TableId tableId = getTableId(event);
offset.event(tableId, timestsamp);
SchemaChangeEvent.SchemaChangeEventType type = switch (event.type()) {
case CREATE_TABLE -> SchemaChangeEvent.SchemaChangeEventType.CREATE;
case DROP_TABLE -> SchemaChangeEvent.SchemaChangeEventType.DROP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ protected static List<String> getNonEmptyShards(List<List<String>> vitessTabletR
}

@VisibleForTesting
public static List<String> flattenAndConcat(List<List<String>> nestedList) {
protected static List<String> flattenAndConcat(List<List<String>> nestedList) {
return nestedList.stream()
.map(innerList -> String.join("", innerList))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.debezium.connector.vitess;

import java.sql.SQLException;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -14,6 +15,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.vitess.connection.DdlMessage;
import io.debezium.connector.vitess.jdbc.VitessConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
Expand Down Expand Up @@ -103,8 +105,10 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
if (rs.next()) {
String ddlStatement = rs.getString(2);
for (String shard : shards) {
snapshotContext.offset.setShard(shard);
DdlMessage ddlMessage = new DdlMessage("", Instant.now(), ddlStatement, shard);
List<SchemaChangeEvent> schemaChangeEvents = schema.parseDdl(
snapshotContext.partition, snapshotContext.offset, ddlStatement, connectorConfig.getKeyspace(), shard);
snapshotContext.partition, snapshotContext.offset, ddlMessage, connectorConfig.getKeyspace());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
LOGGER.info("Adding schema change event {}", schemaChangeEvent);
Table table = schema.tableFor(tableId);
Expand Down Expand Up @@ -137,7 +141,6 @@ private String quote(String dbOrTableName) {

@Override
protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext<VitessPartition, VitessOffsetContext> snapshotContext) {
LOGGER.info("release schema locks");
}

@Override
Expand All @@ -152,19 +155,16 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<Vitess

@Override
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<VitessPartition, VitessOffsetContext> snapshotContext, TableId tableId, List<String> columns) {
LOGGER.info("get snapshot select");
return Optional.empty();
}

@Override
protected SnapshotContext<VitessPartition, VitessOffsetContext> prepare(VitessPartition partition, boolean onDemand) {
LOGGER.info("snapshot context");
return new RelationalSnapshotContext<>(partition, connectorConfig.getKeyspace(), onDemand);
}

@Override
protected VitessOffsetContext copyOffset(RelationalSnapshotContext<VitessPartition, VitessOffsetContext> snapshotContext) {
LOGGER.info("copy offset");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public void init(String connector, String version, CommonConnectorConfig connect
this.schema = commonSchemaBuilder()
.name("io.debezium.connector.vitess.Source")
.field(SourceInfo.KEYSPACE_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.SHARD_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.SHARD_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA)
.build();
}
Expand All @@ -37,7 +37,7 @@ public Schema schema() {
public Struct struct(SourceInfo sourceInfo) {
final Struct res = super.commonStruct(sourceInfo)
.put(SourceInfo.KEYSPACE_NAME_KEY, sourceInfo.keyspace())
.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.table())
.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table())
.put(SourceInfo.SHARD_KEY, sourceInfo.shard())
.put(SourceInfo.VGTID_KEY, sourceInfo.getCurrentVgtid().toString());
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) {

DdlMessage ddlMessage = (DdlMessage) message;
List<SchemaChangeEvent> schemaChangeEvents = schema.parseDdl(
partition, offsetContext, ddlMessage.getStatement(),
connectorConfig.getKeyspace(), ddlMessage.getShard());
partition, offsetContext, ddlMessage,
connectorConfig.getKeyspace());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id();
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableId, (receiver) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,6 @@ public Vtgate.ExecuteResponse execute(String sqlStatement) {
return newBlockingStub(channel).execute(request);
}

public Vtgate.ExecuteResponse executeInKeyspace(String sqlStatement) {
LOGGER.info("Executing sqlStament {}", sqlStatement);
ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize());
managedChannel.compareAndSet(null, channel);

String target = String.format("%s", config.getKeyspace());
Vtgate.Session session = Vtgate.Session.newBuilder().setTargetString(target).setAutocommit(true).build();
LOGGER.debug("Autocommit {}", session.getAutocommit());
Vtgate.ExecuteRequest request = Vtgate.ExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(sqlStatement, Collections.emptyMap()))
.setSession(session)
.build();
return newBlockingStub(channel).execute(request);
}

public Vtgate.ExecuteResponse execute(String sqlStatement, String shard) {
LOGGER.info("Executing sqlStament {}", sqlStatement);
ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ public void schemaIsCorrect() {
.field("ts_us", Schema.OPTIONAL_INT64_SCHEMA)
.field("ts_ns", Schema.OPTIONAL_INT64_SCHEMA)
.field("keyspace", Schema.STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
.field("shard", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA)
.field("shard", Schema.STRING_SCHEMA)
.field("vgtid", Schema.STRING_SCHEMA)
.build();

Expand Down
16 changes: 3 additions & 13 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ public void shouldUseLocalVgtid() throws Exception {

Vgtid baseVgtid = TestHelper.getCurrentVgtid();
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount + 2, getKeyspaceTopicPrefix(true), TEST_SERVER + ".transaction");
consumer = testConsumer(expectedRecordsCount + 2);

String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)";
String insertQuery = "INSERT INTO numeric_table ("
Expand Down Expand Up @@ -1328,11 +1328,11 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti

TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(Function.identity(), hasMultipleShards, true, 2, 0, 1, null, null, null);
startConnector(Function.identity(), hasMultipleShards, true, 2, 0, 1, null, VitessConnectorConfig.SnapshotMode.NEVER, null);
assertConnectorIsRunning();

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(hasMultipleShards));
consumer = testConsumer(expectedRecordsCount);
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, hasMultipleShards);
}

Expand Down Expand Up @@ -2261,16 +2261,6 @@ private SourceRecord assertRecordInserted(String expectedTopicName, String pkFie
return assertRecordInserted(insertedRecord, expectedTopicName, pkField, pkValue);
}

private SourceRecord assertRecordDDL() {
assertFalse("records not generated", consumer.isEmpty());
SourceRecord ddlRecord = consumer.remove();
return assertRecordDDL(ddlRecord);
}

private SourceRecord assertRecordDDL(SourceRecord ddlRecord) {
return null;
}

private SourceRecord assertRecordUpdated() {
assertFalse("records not generated", consumer.isEmpty());
SourceRecord updatedRecord = consumer.remove();
Expand Down
Loading

0 comments on commit 3f34946

Please sign in to comment.