diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java index d26db8ab..af934d52 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java @@ -34,14 +34,18 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Offsets; +import io.debezium.pipeline.spi.Partition; import io.debezium.processors.PostProcessorRegistryServiceProvider; import io.debezium.relational.TableId; +import io.debezium.relational.history.MemorySchemaHistory; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; import io.debezium.service.spi.ServiceRegistry; import io.debezium.snapshot.SnapshotterService; import io.debezium.snapshot.SnapshotterServiceProvider; +import io.debezium.spi.snapshot.Snapshotter; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; import io.debezium.util.LoggingContext; @@ -131,6 +135,15 @@ protected ChangeEventSourceCoordinator sta MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( () -> new VitessConnection(jdbcConfig)); + final Snapshotter snapshotter = snapshotterService.getSnapshotter(); + + // For certain tests we use the MemorySchemaHistory and do not want this validation to cause errors + // TODO: The correct way to do this is for every test that we set the offsets, we also set the schema history file + // to its correct value + if (!connectorConfig.getSchemaHistory().getClass().equals(MemorySchemaHistory.class)) { + validateAndLoadSchemaHistory(connectorConfig, VitessConnectorTask::validateLogPosition, previousOffsets, schema, snapshotter); + } + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( previousOffsets, errorHandler, @@ -154,6 +167,11 @@ protected ChangeEventSourceCoordinator sta } } + private static boolean validateLogPosition(Partition partition, OffsetContext offsetContext, CommonConnectorConfig commonConnectorConfig) { + // Always treat the log positions as valid to avoid any unnecessary data snapshots. + return true; + } + @VisibleForTesting public Configuration getConfigWithOffsets(Configuration config) { VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config); diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index c67b0f96..99d7afbc 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -11,6 +11,7 @@ import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Connection; import java.sql.Types; @@ -35,6 +36,7 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; import io.debezium.relational.history.MemorySchemaHistory; +import io.debezium.util.Testing; import io.vitess.proto.Query; import io.vitess.proto.Query.Field; @@ -53,6 +55,8 @@ public class TestHelper { public static final Long TEST_SHARD2_EPOCH = 3L; public static final ShardEpochMap TEST_SHARD_TO_EPOCH = new ShardEpochMap(Map.of(TEST_SHARD1, TEST_SHARD1_EPOCH, TEST_SHARD2, TEST_SHARD2_EPOCH)); + public static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath(); + public static final String TEST_GTID = "MySQL56/a790d864-9ba1-11ea-99f6-0242ac11000a:1-1513"; public static final String TEST_TABLE = "test_table"; private static final String TEST_VITESS_FULL_TABLE = TEST_UNSHARDED_KEYSPACE + "." + TEST_TABLE; diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 7b05b3d8..d2612d88 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.vitess; +import static io.debezium.connector.vitess.TestHelper.SCHEMA_HISTORY_PATH; import static io.debezium.connector.vitess.TestHelper.TEST_EMPTY_SHARD_KEYSPACE; import static io.debezium.connector.vitess.TestHelper.TEST_NON_EMPTY_SHARD; import static io.debezium.connector.vitess.TestHelper.TEST_SERVER; @@ -75,6 +76,7 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; import io.debezium.relational.history.MemorySchemaHistory; +import io.debezium.storage.file.history.FileSchemaHistory; import io.debezium.util.Collect; import io.debezium.util.Testing; @@ -88,12 +90,18 @@ public class VitessConnectorIT extends AbstractVitessConnectorTest { @Before public void before() { Testing.Print.enable(); + Files.delete(SCHEMA_HISTORY_PATH); } @After public void after() { - stopConnector(); - assertConnectorNotRunning(); + try { + stopConnector(); + assertConnectorNotRunning(); + } + finally { + Files.delete(SCHEMA_HISTORY_PATH); + } } @Test @@ -182,14 +190,66 @@ public void shouldReceiveHeartbeatEvents() throws Exception { assertThat(records.recordsForTopic(topic).size()).isEqualTo(expectedHeartbeatRecords); } + @Test + @FixFor("DBZ-8325") + public void shouldOnlySnapshotSchemaOnce() throws Exception { + String keyspace = TEST_SHARDED_KEYSPACE; + String table = keyspace + ".ddl_table"; + TestHelper.executeDDL("vitess_create_tables.ddl", keyspace); + TestHelper.applyVSchema("vitess_vschema.json"); + + startConnector(config -> config + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) + .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) + .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) + .with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true); + assertConnectorIsRunning(); + + String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX); + + int expectedSchemaChangeRecords = 2; + consumer = testConsumer(expectedSchemaChangeRecords); + consumer.expects(expectedSchemaChangeRecords); + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + for (int i = 0; i < expectedSchemaChangeRecords; i++) { + SourceRecord record = consumer.remove(); + assertThat(record.topic()).isEqualTo(schemaChangeTopic); + } + assertThat(consumer.isEmpty()); + + stopConnector(); + + startConnector(config -> config + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) + .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER) + .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) + .with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true); + + // After startup, we need to make sure we recover the schema history (ie with snapshot disabled) + // so that we can properly parse & emit a schema change event for this alter table statement. + TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE); + int expectedTotalRecords = 2; + consumer = testConsumer(expectedTotalRecords); + consumer.expects(expectedTotalRecords); + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + for (int i = 0; i < expectedTotalRecords; i++) { + SourceRecord record = consumer.remove(); + assertThat(record.topic()).isEqualTo(schemaChangeTopic); + } + assertThat(consumer.isEmpty()); + } + @Test @FixFor("DBZ-8325") public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception { String keyspace = TEST_SHARDED_KEYSPACE; String table = keyspace + ".ddl_table"; TestHelper.executeDDL("vitess_create_tables.ddl", keyspace); + TestHelper.applyVSchema("vitess_vschema.json"); startConnector(config -> config - .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true) + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), @@ -200,14 +260,15 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE); TestHelper.execute("ALTER TABLE ddl_table ADD PARTITION (PARTITION p2 VALUES LESS THAN (2000));", TEST_SHARDED_KEYSPACE); + TestHelper.execute("ALTER TABLE ddl_table DROP PARTITION p0;", TEST_SHARDED_KEYSPACE); TestHelper.execute("TRUNCATE TABLE ddl_table;", TEST_SHARDED_KEYSPACE); TestHelper.execute("DROP TABLE ddl_table;", TEST_SHARDED_KEYSPACE); TestHelper.execute("CREATE TABLE ddl_table (id BIGINT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id));", TEST_SHARDED_KEYSPACE); // 1 for the snapshot (create table ddls) - // 5 for the changes above - // 2 shards, so (5 + 1) * 2 = 12 - int expectedSchemaChangeRecords = 12; + // 6 for the changes above + // 2 shards, so (6 + 1) * 2 = 14 + int expectedSchemaChangeRecords = 14; consumer = testConsumer(expectedSchemaChangeRecords); consumer.expects(expectedSchemaChangeRecords); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); @@ -223,7 +284,7 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception public void shouldReceiveSnapshotAndSchemaChangeEvents() throws Exception { TestHelper.executeDDL("vitess_create_tables.ddl"); startConnector(config -> config - .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true) + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.ddl_table") .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), @@ -257,7 +318,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio TestHelper.executeDDL("vitess_create_tables.ddl"); // startConnector(); startConnector(config -> config - .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true) + .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), false); assertConnectorIsRunning(); diff --git a/src/test/resources/vitess_vschema.json b/src/test/resources/vitess_vschema.json index 0681d287..47104980 100644 --- a/src/test/resources/vitess_vschema.json +++ b/src/test/resources/vitess_vschema.json @@ -18,6 +18,18 @@ "sequence": "my_seq" } }, + "ddl_table": { + "columnVindexes": [ + { + "column": "id", + "name": "hash" + } + ], + "autoIncrement": { + "column": "id", + "sequence": "my_seq" + } + }, "string_table": { "columnVindexes": [ {