Skip to content

Commit

Permalink
DBZ-8325 Support internal schema recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 17, 2024
1 parent 3f34946 commit 4fce59d
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,18 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.relational.TableId;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.snapshot.SnapshotterServiceProvider;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
Expand Down Expand Up @@ -131,6 +135,15 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
MainConnectionProvidingConnectionFactory<VitessConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
() -> new VitessConnection(jdbcConfig));

final Snapshotter snapshotter = snapshotterService.getSnapshotter();

// For certain tests we use the MemorySchemaHistory and do not want this validation to cause errors
// TODO: The correct way to do this is for every test that we set the offsets, we also set the schema history file
// to its correct value
if (!connectorConfig.getSchemaHistory().getClass().equals(MemorySchemaHistory.class)) {
validateAndLoadSchemaHistory(connectorConfig, VitessConnectorTask::validateLogPosition, previousOffsets, schema, snapshotter);
}

ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
Expand All @@ -154,6 +167,11 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
}
}

private static boolean validateLogPosition(Partition partition, OffsetContext offsetContext, CommonConnectorConfig commonConnectorConfig) {
// Always treat the log positions as valid to avoid any unnecessary data snapshots.
return true;
}

@VisibleForTesting
public Configuration getConfigWithOffsets(Configuration config) {
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config);
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/io/debezium/connector/vitess/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Types;
Expand All @@ -35,6 +36,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.util.Testing;
import io.vitess.proto.Query;
import io.vitess.proto.Query.Field;

Expand All @@ -53,6 +55,8 @@ public class TestHelper {
public static final Long TEST_SHARD2_EPOCH = 3L;
public static final ShardEpochMap TEST_SHARD_TO_EPOCH = new ShardEpochMap(Map.of(TEST_SHARD1, TEST_SHARD1_EPOCH, TEST_SHARD2, TEST_SHARD2_EPOCH));

public static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath();

public static final String TEST_GTID = "MySQL56/a790d864-9ba1-11ea-99f6-0242ac11000a:1-1513";
public static final String TEST_TABLE = "test_table";
private static final String TEST_VITESS_FULL_TABLE = TEST_UNSHARDED_KEYSPACE + "." + TEST_TABLE;
Expand Down
77 changes: 69 additions & 8 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.vitess;

import static io.debezium.connector.vitess.TestHelper.SCHEMA_HISTORY_PATH;
import static io.debezium.connector.vitess.TestHelper.TEST_EMPTY_SHARD_KEYSPACE;
import static io.debezium.connector.vitess.TestHelper.TEST_NON_EMPTY_SHARD;
import static io.debezium.connector.vitess.TestHelper.TEST_SERVER;
Expand Down Expand Up @@ -75,6 +76,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.Collect;
import io.debezium.util.Testing;

Expand All @@ -88,12 +90,18 @@ public class VitessConnectorIT extends AbstractVitessConnectorTest {
@Before
public void before() {
Testing.Print.enable();
Files.delete(SCHEMA_HISTORY_PATH);
}

@After
public void after() {
stopConnector();
assertConnectorNotRunning();
try {
stopConnector();
assertConnectorNotRunning();
}
finally {
Files.delete(SCHEMA_HISTORY_PATH);
}
}

@Test
Expand Down Expand Up @@ -182,14 +190,66 @@ public void shouldReceiveHeartbeatEvents() throws Exception {
assertThat(records.recordsForTopic(topic).size()).isEqualTo(expectedHeartbeatRecords);
}

@Test
@FixFor("DBZ-8325")
public void shouldOnlySnapshotSchemaOnce() throws Exception {
String keyspace = TEST_SHARDED_KEYSPACE;
String table = keyspace + ".ddl_table";
TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
TestHelper.applyVSchema("vitess_vschema.json");

startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);
assertConnectorIsRunning();

String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);

int expectedSchemaChangeRecords = 2;
consumer = testConsumer(expectedSchemaChangeRecords);
consumer.expects(expectedSchemaChangeRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedSchemaChangeRecords; i++) {
SourceRecord record = consumer.remove();
assertThat(record.topic()).isEqualTo(schemaChangeTopic);
}
assertThat(consumer.isEmpty());

stopConnector();

startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);

// After startup, we need to make sure we recover the schema history (ie with snapshot disabled)
// so that we can properly parse & emit a schema change event for this alter table statement.
TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);
int expectedTotalRecords = 2;
consumer = testConsumer(expectedTotalRecords);
consumer.expects(expectedTotalRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedTotalRecords; i++) {
SourceRecord record = consumer.remove();
assertThat(record.topic()).isEqualTo(schemaChangeTopic);
}
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception {
String keyspace = TEST_SHARDED_KEYSPACE;
String table = keyspace + ".ddl_table";
TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
Expand All @@ -200,14 +260,15 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception

TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);
TestHelper.execute("ALTER TABLE ddl_table ADD PARTITION (PARTITION p2 VALUES LESS THAN (2000));", TEST_SHARDED_KEYSPACE);
TestHelper.execute("ALTER TABLE ddl_table DROP PARTITION p0;", TEST_SHARDED_KEYSPACE);
TestHelper.execute("TRUNCATE TABLE ddl_table;", TEST_SHARDED_KEYSPACE);
TestHelper.execute("DROP TABLE ddl_table;", TEST_SHARDED_KEYSPACE);
TestHelper.execute("CREATE TABLE ddl_table (id BIGINT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id));", TEST_SHARDED_KEYSPACE);

// 1 for the snapshot (create table ddls)
// 5 for the changes above
// 2 shards, so (5 + 1) * 2 = 12
int expectedSchemaChangeRecords = 12;
// 6 for the changes above
// 2 shards, so (6 + 1) * 2 = 14
int expectedSchemaChangeRecords = 14;
consumer = testConsumer(expectedSchemaChangeRecords);
consumer.expects(expectedSchemaChangeRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
Expand All @@ -223,7 +284,7 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception
public void shouldReceiveSnapshotAndSchemaChangeEvents() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.ddl_table")
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
Expand Down Expand Up @@ -257,7 +318,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio
TestHelper.executeDDL("vitess_create_tables.ddl");
// startConnector();
startConnector(config -> config
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true)
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
false);
assertConnectorIsRunning();
Expand Down
12 changes: 12 additions & 0 deletions src/test/resources/vitess_vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
"sequence": "my_seq"
}
},
"ddl_table": {
"columnVindexes": [
{
"column": "id",
"name": "hash"
}
],
"autoIncrement": {
"column": "id",
"sequence": "my_seq"
}
},
"string_table": {
"columnVindexes": [
{
Expand Down

0 comments on commit 4fce59d

Please sign in to comment.