diff --git a/src/main/java/io/debezium/connector/vitess/SourceInfo.java b/src/main/java/io/debezium/connector/vitess/SourceInfo.java index 75a793c1..53e87513 100644 --- a/src/main/java/io/debezium/connector/vitess/SourceInfo.java +++ b/src/main/java/io/debezium/connector/vitess/SourceInfo.java @@ -114,8 +114,4 @@ public String toString() { + restartVgtid + '}'; } - - public String table() { - return tableId == null ? null : tableId.table(); - } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java index e5ab9d94..739b1ff1 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java +++ b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.vitess; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -15,6 +16,7 @@ import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.connector.vitess.connection.DdlMessage; import io.debezium.connector.vitess.jdbc.VitessBinlogValueConverter; import io.debezium.connector.vitess.jdbc.VitessDefaultValueConverter; import io.debezium.relational.HistorizedRelationalDatabaseSchema; @@ -118,9 +120,11 @@ private String getDatabaseWithShard(String shard, String database) { return String.format("%s.%s", shard, database); } - public List parseDdl(VitessPartition partition, VitessOffsetContext offset, String ddlStatement, - String databaseName, String shard) { + public List parseDdl(VitessPartition partition, VitessOffsetContext offset, DdlMessage ddlMessage, String databaseName) { final List schemaChangeEvents = new ArrayList<>(1); + String ddlStatement = ddlMessage.getStatement(); + String shard = ddlMessage.getShard(); + Instant timestsamp = ddlMessage.getCommitTime(); DdlChanges ddlChanges = ddlParser.getDdlChanges(); ddlChanges.reset(); ddlParser.setCurrentDatabase(getDatabaseWithShard(shard, databaseName)); @@ -129,6 +133,7 @@ public List parseDdl(VitessPartition partition, VitessOffsetC ddlChanges.getEventsByDatabase((String dbName, List events) -> { events.forEach(event -> { final TableId tableId = getTableId(event); + offset.event(tableId, timestsamp); SchemaChangeEvent.SchemaChangeEventType type = switch (event.type()) { case CREATE_TABLE -> SchemaChangeEvent.SchemaChangeEventType.CREATE; case DROP_TABLE -> SchemaChangeEvent.SchemaChangeEventType.DROP; diff --git a/src/main/java/io/debezium/connector/vitess/VitessMetadata.java b/src/main/java/io/debezium/connector/vitess/VitessMetadata.java index dba2a367..b3175496 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessMetadata.java +++ b/src/main/java/io/debezium/connector/vitess/VitessMetadata.java @@ -193,7 +193,7 @@ protected static List getNonEmptyShards(List> vitessTabletR } @VisibleForTesting - public static List flattenAndConcat(List> nestedList) { + protected static List flattenAndConcat(List> nestedList) { return nestedList.stream() .map(innerList -> String.join("", innerList)) .collect(Collectors.toList()); diff --git a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java index 63365224..84fbe594 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java @@ -6,6 +6,7 @@ package io.debezium.connector.vitess; import java.sql.SQLException; +import java.time.Instant; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -14,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.vitess.connection.DdlMessage; import io.debezium.connector.vitess.jdbc.VitessConnection; import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.EventDispatcher; @@ -103,8 +105,10 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, if (rs.next()) { String ddlStatement = rs.getString(2); for (String shard : shards) { + snapshotContext.offset.setShard(shard); + DdlMessage ddlMessage = new DdlMessage("", Instant.now(), ddlStatement, shard); List schemaChangeEvents = schema.parseDdl( - snapshotContext.partition, snapshotContext.offset, ddlStatement, connectorConfig.getKeyspace(), shard); + snapshotContext.partition, snapshotContext.offset, ddlMessage, connectorConfig.getKeyspace()); for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { LOGGER.info("Adding schema change event {}", schemaChangeEvent); Table table = schema.tableFor(tableId); @@ -137,7 +141,6 @@ private String quote(String dbOrTableName) { @Override protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext snapshotContext) { - LOGGER.info("release schema locks"); } @Override @@ -152,19 +155,16 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId, List columns) { - LOGGER.info("get snapshot select"); return Optional.empty(); } @Override protected SnapshotContext prepare(VitessPartition partition, boolean onDemand) { - LOGGER.info("snapshot context"); return new RelationalSnapshotContext<>(partition, connectorConfig.getKeyspace(), onDemand); } @Override protected VitessOffsetContext copyOffset(RelationalSnapshotContext snapshotContext) { - LOGGER.info("copy offset"); return null; } diff --git a/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java b/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java index cda99947..94dce398 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java @@ -22,8 +22,8 @@ public void init(String connector, String version, CommonConnectorConfig connect this.schema = commonSchemaBuilder() .name("io.debezium.connector.vitess.Source") .field(SourceInfo.KEYSPACE_NAME_KEY, Schema.STRING_SCHEMA) - .field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) - .field(SourceInfo.SHARD_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA) + .field(SourceInfo.SHARD_KEY, Schema.STRING_SCHEMA) .field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA) .build(); } @@ -37,7 +37,7 @@ public Schema schema() { public Struct struct(SourceInfo sourceInfo) { final Struct res = super.commonStruct(sourceInfo) .put(SourceInfo.KEYSPACE_NAME_KEY, sourceInfo.keyspace()) - .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.table()) + .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()) .put(SourceInfo.SHARD_KEY, sourceInfo.shard()) .put(SourceInfo.VGTID_KEY, sourceInfo.getCurrentVgtid().toString()); return res; diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java index 1a7da739..702f4158 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java @@ -121,8 +121,8 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) { DdlMessage ddlMessage = (DdlMessage) message; List schemaChangeEvents = schema.parseDdl( - partition, offsetContext, ddlMessage.getStatement(), - connectorConfig.getKeyspace(), ddlMessage.getShard()); + partition, offsetContext, ddlMessage, + connectorConfig.getKeyspace()); for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id(); dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableId, (receiver) -> { diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index ac7419e5..9bffb4d9 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -77,21 +77,6 @@ public Vtgate.ExecuteResponse execute(String sqlStatement) { return newBlockingStub(channel).execute(request); } - public Vtgate.ExecuteResponse executeInKeyspace(String sqlStatement) { - LOGGER.info("Executing sqlStament {}", sqlStatement); - ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize()); - managedChannel.compareAndSet(null, channel); - - String target = String.format("%s", config.getKeyspace()); - Vtgate.Session session = Vtgate.Session.newBuilder().setTargetString(target).setAutocommit(true).build(); - LOGGER.debug("Autocommit {}", session.getAutocommit()); - Vtgate.ExecuteRequest request = Vtgate.ExecuteRequest.newBuilder() - .setQuery(Proto.bindQuery(sqlStatement, Collections.emptyMap())) - .setSession(session) - .build(); - return newBlockingStub(channel).execute(request); - } - public Vtgate.ExecuteResponse execute(String sqlStatement, String shard) { LOGGER.info("Executing sqlStament {}", sqlStatement); ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize()); diff --git a/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java b/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java index 5b43af2a..03e19af4 100644 --- a/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java +++ b/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java @@ -128,8 +128,8 @@ public void schemaIsCorrect() { .field("ts_us", Schema.OPTIONAL_INT64_SCHEMA) .field("ts_ns", Schema.OPTIONAL_INT64_SCHEMA) .field("keyspace", Schema.STRING_SCHEMA) - .field("table", Schema.OPTIONAL_STRING_SCHEMA) - .field("shard", Schema.OPTIONAL_STRING_SCHEMA) + .field("table", Schema.STRING_SCHEMA) + .field("shard", Schema.STRING_SCHEMA) .field("vgtid", Schema.STRING_SCHEMA) .build(); diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index deb7d2d2..7b05b3d8 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -795,7 +795,7 @@ public void shouldUseLocalVgtid() throws Exception { Vgtid baseVgtid = TestHelper.getCurrentVgtid(); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount + 2, getKeyspaceTopicPrefix(true), TEST_SERVER + ".transaction"); + consumer = testConsumer(expectedRecordsCount + 2); String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)"; String insertQuery = "INSERT INTO numeric_table (" @@ -1328,11 +1328,11 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); TestHelper.applyVSchema("vitess_vschema.json"); - startConnector(Function.identity(), hasMultipleShards, true, 2, 0, 1, null, null, null); + startConnector(Function.identity(), hasMultipleShards, true, 2, 0, 1, null, VitessConnectorConfig.SnapshotMode.NEVER, null); assertConnectorIsRunning(); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(hasMultipleShards)); + consumer = testConsumer(expectedRecordsCount); assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, hasMultipleShards); } @@ -2261,16 +2261,6 @@ private SourceRecord assertRecordInserted(String expectedTopicName, String pkFie return assertRecordInserted(insertedRecord, expectedTopicName, pkField, pkValue); } - private SourceRecord assertRecordDDL() { - assertFalse("records not generated", consumer.isEmpty()); - SourceRecord ddlRecord = consumer.remove(); - return assertRecordDDL(ddlRecord); - } - - private SourceRecord assertRecordDDL(SourceRecord ddlRecord) { - return null; - } - private SourceRecord assertRecordUpdated() { assertFalse("records not generated", consumer.isEmpty()); SourceRecord updatedRecord = consumer.remove(); diff --git a/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java b/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java index 0a503401..d1791ce5 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java @@ -42,7 +42,7 @@ import binlogdata.Binlogdata; import ch.qos.logback.classic.Level; -public class VitessReplicationConnectionIT { +public class VitessReplicationConnectionIT extends VitessTestCleanup { private static final Logger LOGGER = LoggerFactory.getLogger(VitessReplicationConnectionIT.class); protected long pollTimeoutInMs = SECONDS.toMillis(5); @@ -62,11 +62,11 @@ public void shouldErrorOutWhenSkipEnabled() throws Exception { TestHelper.defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", "skip").build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema); + VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema); Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -106,11 +106,11 @@ public void shouldErrorOutWhenWarnEnabled() throws Exception { TestHelper.defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", "warn").build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema); + VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema); Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -149,11 +149,11 @@ public void shouldFailWhenFailEnabled() throws Exception { TestHelper.defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", "fail").build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema); + VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema); Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -193,11 +193,11 @@ public void shouldFailWhenErrorProcessingModeIsNotSet() throws Exception { null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", null).build()); conf.getEventProcessingFailureHandlingMode(); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema); + VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema); Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -233,11 +233,11 @@ public void shouldFailWhenErrorProcessingModeIsNotSet() throws Exception { public void shouldHaveVgtidInResponse() throws Exception { // setup fixture final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) { + try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema)) { Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -300,11 +300,11 @@ public void shouldSendHeartbeatMessage() throws Exception { // setup fixture final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().with( Heartbeat.HEARTBEAT_INTERVAL, 1000).build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) { + try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema)) { Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -362,11 +362,11 @@ public void shouldSendHeartbeatMessage() throws Exception { public void shouldSendCommitTimestamp() throws Exception { // setup fixture final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) { + try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema)) { Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -472,11 +472,11 @@ public void shouldCopyAndReplicate() throws Exception { final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig() .with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, tableInclude).build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) { + try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema)) { Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -548,10 +548,10 @@ public void shouldCopyAndReplicate() throws Exception { @FixFor("DBZ-4353") public void shouldReturnUpdatedSchemaWithOnlineDdl() throws Exception { final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) { + try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema)) { Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( @@ -631,11 +631,11 @@ public void shouldNotFailWhenTableNameIsReservedKeyword() throws Exception { final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig() .with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, tableInclude).build()); - final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + schema = new VitessDatabaseSchema( conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); AtomicReference error = new AtomicReference<>(); - try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) { + try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, schema)) { Vgtid startingVgtid = Vgtid.of( Binlogdata.VGtid.newBuilder() .addShardGtids( diff --git a/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java b/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java index 34e5decb..596a798f 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java @@ -41,9 +41,9 @@ public void shouldGetCorrectSourceInfoSchema() { assertThat(structMaker.schema().field(SourceInfo.KEYSPACE_NAME_KEY).schema()) .isEqualTo(Schema.STRING_SCHEMA); assertThat(structMaker.schema().field(SourceInfo.SHARD_KEY).schema()) - .isEqualTo(Schema.OPTIONAL_STRING_SCHEMA); + .isEqualTo(Schema.STRING_SCHEMA); assertThat(structMaker.schema().field(SourceInfo.TABLE_NAME_KEY).schema()) - .isEqualTo(Schema.OPTIONAL_STRING_SCHEMA); + .isEqualTo(Schema.STRING_SCHEMA); assertThat(structMaker.schema().field(SourceInfo.VGTID_KEY).schema()) .isEqualTo(Schema.STRING_SCHEMA); assertThat(structMaker.schema()).isNotNull();