Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8325 Emit DDL events #210

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,23 @@
<artifactId>debezium-revapi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this dependency is not needed as it is a transitive one for debezium-connector-mysql

<groupId>io.debezium</groupId>
<artifactId>debezium-connector-binlog</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>


<!-- Testing -->
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down Expand Up @@ -250,11 +267,6 @@
<artifactId>debezium-embedded</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy<TableId> {

private final String overrideDataChangeTopicPrefix;
private final String overrideSchemaChangeTopic;

public TableTopicNamingStrategy(Properties props) {
super(props);
Configuration config = Configuration.from(props);
this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX);
this.overrideSchemaChangeTopic = config.getString(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC);
}

public static TableTopicNamingStrategy create(CommonConnectorConfig config) {
Expand All @@ -44,4 +46,22 @@ public String dataChangeTopic(TableId id) {
}
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}

/**
* Return the schema change topic. There are two cases:
* 1. If override schema change topic is specified - use this as the topic name
* 2. If override schema change topic is not specified - call the super method to get the typical
* schema change topic name.
*
* @return String representing the schema change topic name.
*/
@Override
public String schemaChangeTopic() {
if (!Strings.isNullOrBlank(overrideSchemaChangeTopic)) {
return overrideSchemaChangeTopic;
}
else {
return super.schemaChangeTopic();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package io.debezium.connector.vitess;

import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.connector.vitess.jdbc.VitessConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
Expand All @@ -31,15 +32,18 @@ public class VitessChangeEventSourceFactory implements ChangeEventSourceFactory<
private final VitessDatabaseSchema schema;
private final ReplicationConnection replicationConnection;
private final SnapshotterService snapshotterService;
private final MainConnectionProvidingConnectionFactory<VitessConnection> connectionFactory;

public VitessChangeEventSourceFactory(
VitessConnectorConfig connectorConfig,
MainConnectionProvidingConnectionFactory<VitessConnection> connectionFactory,
ErrorHandler errorHandler,
EventDispatcher<VitessPartition, TableId> dispatcher,
Clock clock,
VitessDatabaseSchema schema,
ReplicationConnection replicationConnection, SnapshotterService snapshotterService) {
this.connectorConfig = connectorConfig;
this.connectionFactory = connectionFactory;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
Expand All @@ -54,11 +58,11 @@ public SnapshotChangeEventSource<VitessPartition, VitessOffsetContext> getSnapsh
// A dummy SnapshotChangeEventSource, snapshot is skipped.
return new VitessSnapshotChangeEventSource(
connectorConfig,
new DefaultMainConnectionProvidingConnectionFactory<>(() -> null),
this.connectionFactory,
dispatcher,
schema,
clock,
null,
snapshotProgressListener,
notificationService,
snapshotterService);
}
Expand Down
133 changes: 130 additions & 3 deletions src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistryServiceProvider;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
Expand All @@ -39,23 +41,57 @@
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Collect;

/**
* Vitess connector configuration, including its specific configurations and the common
* configurations from Debezium.
*/
public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig {

public static final String CSV_DELIMITER = ",";

private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnectorConfig.class);

private static final String VITESS_CONFIG_GROUP_PREFIX = "vitess.";
private static final String JDBC_CONFIG_PREFIX = "jdbc.";
private static final int DEFAULT_VTGATE_PORT = 15_991;

@VisibleForTesting
protected static final int DEFAULT_VTGATE_JDBC_PORT = 15_306;

/**
* Set of all built-in database names that will generally be ignored by the connector.
*/
protected static final Set<String> BUILT_IN_DB_NAMES = Collect.unmodifiableSet(
"mysql", "performance_schema", "sys", "information_schema");

@Override
public JdbcConfiguration getJdbcConfig() {
JdbcConfiguration jdbcConfiguration = super.getJdbcConfig();
JdbcConfiguration updatedConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit()
.with(JdbcConfiguration.USER, getVtgateJdbcUsername())
.with(JdbcConfiguration.PASSWORD, getVtgateJdbcPassword())
.with(JdbcConfiguration.DATABASE, getKeyspace())
.with(JdbcConfiguration.PORT, getVtgateJdbcPort())
.build());
return updatedConfig;
}

@Override
protected HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() {

};
}

/**
* The set of predefined SnapshotMode options or aliases.
*/
Expand All @@ -66,6 +102,12 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL("initial"),

NO_DATA("no_data"),

CONFIGURATION_BASED("configuration_based"),

RECOVERY("recovery"),

/**
* Never perform an initial snapshot and only receive new data changes.
*/
Expand Down Expand Up @@ -203,6 +245,29 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate gRPC server.");

public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PORT)
.withDisplayName("Vitess JDBC database port")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withDefault(DEFAULT_VTGATE_JDBC_PORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate JDBC server.");

public static final Field VTGATE_JDBC_USER = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.USER)
.withDisplayName("Vitess JDBC database user")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Username of the Vitess VTGate JDBC server.");

public static final Field VTGATE_JDBC_PASSWORD = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PASSWORD)
.withDisplayName("Vitess JDBC database password")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Password of the Vitess VTGate JDBC server.");

public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER)
.withDisplayName("User")
.withType(Type.STRING)
Expand Down Expand Up @@ -368,6 +433,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");

public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withDisplayName("Override schema change topic name")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx).");

public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen")
.withDisplayName("Offset storage task key generation number")
.withType(Type.INT)
Expand Down Expand Up @@ -470,6 +543,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GTID,
VTGATE_HOST,
VTGATE_PORT,
VTGATE_JDBC_PORT,
VTGATE_USER,
VTGATE_PASSWORD,
TABLET_TYPE,
Expand All @@ -479,6 +553,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GRPC_MAX_INBOUND_MESSAGE_SIZE,
BINARY_HANDLING_MODE,
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX,
OVERRIDE_SCHEMA_CHANGE_TOPIC,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OFFSET_STORAGE_TASK_KEY_GEN,
Expand Down Expand Up @@ -536,11 +611,16 @@ public static ConfigDef configDef() {

public VitessConnectorConfig(Configuration config) {
super(
VitessConnector.class,
config,
null, x -> x.schema() + "." + x.table(),
Tables.TableFilter.fromPredicate(VitessConnectorConfig::isNotBuiltInTable),
x -> x.schema() + "." + x.table(),
true,
-1,
ColumnFilterMode.SCHEMA,
true);
false);

getServiceRegistry().registerServiceProvider(new MySqlCharsetRegistryServiceProvider());
}

@Override
Expand Down Expand Up @@ -649,14 +729,38 @@ public int getVtgatePort() {
return getConfig().getInteger(VTGATE_PORT);
}

public int getVtgateJdbcPort() {
return getConfig().getInteger(VTGATE_JDBC_PORT);
}

public String getVtgateUsername() {
return getConfig().getString(VTGATE_USER);
}

public String getVtgateJdbcUsername() {
String jdbcUsername = getConfig().getString(VTGATE_JDBC_USER);
if (jdbcUsername != null) {
return jdbcUsername;
}
else {
return getConfig().getString(VTGATE_USER);
}
}

public String getVtgatePassword() {
return getConfig().getString(VTGATE_PASSWORD);
}

public String getVtgateJdbcPassword() {
String jdbcPassword = getConfig().getString(VTGATE_JDBC_PASSWORD);
if (jdbcPassword != null) {
return jdbcPassword;
}
else {
return getConfig().getString(VTGATE_PASSWORD);
}
}

public String getTabletType() {
return getConfig().getString(TABLET_TYPE);
}
Expand Down Expand Up @@ -754,6 +858,29 @@ public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, Schema
return new VitessHeartbeatImpl(getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(), getLogicalName(), schemaNameAdjuster);
}

/**
* Checks whether the {@link TableId} refers to a built-in table.
*
* @param tableId the relational table identifier, should not be null
* @return true if the reference refers to a built-in table
*/
public static boolean isNotBuiltInTable(TableId tableId) {
return !isBuiltInDatabase(tableId.catalog());
}

/**
* Check whether the specified database name is a built-in database.
*
* @param databaseName the database name to check
* @return true if the database is a built-in database; false otherwise
*/
public static boolean isBuiltInDatabase(String databaseName) {
if (databaseName == null) {
return false;
}
return BUILT_IN_DB_NAMES.contains(databaseName.toLowerCase());
}

public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() {
return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE),
BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString());
Expand Down
Loading