-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-8325 Emit DDL events #210
Closed
+1,378
−101
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
2ed84c3
DBZ-8325 Emit DDL events
twthorn 3f34946
DBZ-8325 Minor refactor and testing improvements
twthorn 4fce59d
DBZ-8325 Support internal schema recovery
twthorn c379e0d
DBZ-8325 Include config to avoid name conflicts for per-table connectors
twthorn 492d32e
DBZ-8325 Remove unused variable & update java doc
twthorn af9dbc5
DBZ-8325 Use super method for schema change topic name
twthorn e7ca2f5
DBZ-8325 Support store only capture tables setting
twthorn f4935f9
DBZ-8325 Support disabling schema change events
twthorn 57c9c39
DBZ-8325 Support separate jdbc config & creds
twthorn 0792f05
DBZ-8325 Add vitess default value converter
twthorn c55cc9b
DBZ-8325 Register correct schemas during snapshot
twthorn 6a36d4e
DBZ-8325 Fix snapshot recovery itest
twthorn 0f54655
DBZ-8325 Empty commit to rerun itest
twthorn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,13 +23,15 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import io.debezium.annotation.VisibleForTesting; | ||
import io.debezium.config.CommonConnectorConfig; | ||
import io.debezium.config.ConfigDefinition; | ||
import io.debezium.config.Configuration; | ||
import io.debezium.config.EnumeratedValue; | ||
import io.debezium.config.Field; | ||
import io.debezium.config.Field.ValidationOutput; | ||
import io.debezium.connector.SourceInfoStructMaker; | ||
import io.debezium.connector.mysql.charset.MySqlCharsetRegistryServiceProvider; | ||
import io.debezium.connector.vitess.connection.VitessTabletType; | ||
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; | ||
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; | ||
|
@@ -39,23 +41,57 @@ | |
import io.debezium.jdbc.JdbcConfiguration; | ||
import io.debezium.jdbc.TemporalPrecisionMode; | ||
import io.debezium.relational.ColumnFilterMode; | ||
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; | ||
import io.debezium.relational.RelationalDatabaseConnectorConfig; | ||
import io.debezium.relational.TableId; | ||
import io.debezium.relational.Tables; | ||
import io.debezium.relational.history.HistoryRecordComparator; | ||
import io.debezium.schema.SchemaNameAdjuster; | ||
import io.debezium.spi.topic.TopicNamingStrategy; | ||
import io.debezium.util.Collect; | ||
|
||
/** | ||
* Vitess connector configuration, including its specific configurations and the common | ||
* configurations from Debezium. | ||
*/ | ||
public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig { | ||
public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig { | ||
|
||
public static final String CSV_DELIMITER = ","; | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnectorConfig.class); | ||
|
||
private static final String VITESS_CONFIG_GROUP_PREFIX = "vitess."; | ||
private static final String JDBC_CONFIG_PREFIX = "jdbc."; | ||
private static final int DEFAULT_VTGATE_PORT = 15_991; | ||
|
||
@VisibleForTesting | ||
protected static final int DEFAULT_VTGATE_JDBC_PORT = 15_306; | ||
|
||
/** | ||
* Set of all built-in database names that will generally be ignored by the connector. | ||
*/ | ||
protected static final Set<String> BUILT_IN_DB_NAMES = Collect.unmodifiableSet( | ||
"mysql", "performance_schema", "sys", "information_schema"); | ||
|
||
@Override | ||
public JdbcConfiguration getJdbcConfig() { | ||
JdbcConfiguration jdbcConfiguration = super.getJdbcConfig(); | ||
JdbcConfiguration updatedConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit() | ||
.with(JdbcConfiguration.USER, getVtgateJdbcUsername()) | ||
.with(JdbcConfiguration.PASSWORD, getVtgateJdbcPassword()) | ||
.with(JdbcConfiguration.DATABASE, getKeyspace()) | ||
.with(JdbcConfiguration.PORT, getVtgateJdbcPort()) | ||
.build()); | ||
return updatedConfig; | ||
} | ||
|
||
@Override | ||
protected HistoryRecordComparator getHistoryRecordComparator() { | ||
return new HistoryRecordComparator() { | ||
|
||
}; | ||
} | ||
|
||
/** | ||
* The set of predefined SnapshotMode options or aliases. | ||
*/ | ||
|
@@ -66,6 +102,12 @@ public enum SnapshotMode implements EnumeratedValue { | |
*/ | ||
INITIAL("initial"), | ||
|
||
NO_DATA("no_data"), | ||
|
||
CONFIGURATION_BASED("configuration_based"), | ||
|
||
RECOVERY("recovery"), | ||
|
||
/** | ||
* Never perform an initial snapshot and only receive new data changes. | ||
*/ | ||
|
@@ -203,6 +245,29 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue | |
.withValidation(Field::isInteger) | ||
.withDescription("Port of the Vitess VTGate gRPC server."); | ||
|
||
public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PORT) | ||
.withDisplayName("Vitess JDBC database port") | ||
.withType(Type.INT) | ||
.withWidth(Width.SHORT) | ||
.withDefault(DEFAULT_VTGATE_JDBC_PORT) | ||
.withImportance(ConfigDef.Importance.MEDIUM) | ||
.withValidation(Field::isInteger) | ||
.withDescription("Port of the Vitess VTGate JDBC server."); | ||
|
||
public static final Field VTGATE_JDBC_USER = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.USER) | ||
.withDisplayName("Vitess JDBC database user") | ||
.withType(Type.INT) | ||
.withWidth(Width.SHORT) | ||
.withImportance(ConfigDef.Importance.MEDIUM) | ||
.withDescription("Username of the Vitess VTGate JDBC server."); | ||
|
||
public static final Field VTGATE_JDBC_PASSWORD = Field.create(DATABASE_CONFIG_PREFIX + JDBC_CONFIG_PREFIX + JdbcConfiguration.PASSWORD) | ||
.withDisplayName("Vitess JDBC database password") | ||
.withType(Type.INT) | ||
.withWidth(Width.SHORT) | ||
.withImportance(ConfigDef.Importance.MEDIUM) | ||
.withDescription("Password of the Vitess VTGate JDBC server."); | ||
|
||
public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER) | ||
.withDisplayName("User") | ||
.withType(Type.STRING) | ||
|
@@ -368,6 +433,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue | |
.withValidation(CommonConnectorConfig::validateTopicName) | ||
.withDescription("Overrides the topic.prefix used for the data change topic."); | ||
|
||
public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this one, it can be solved using https://debezium.io/documentation/reference/3.0/connectors/vitess.html#vitess-property-topic-naming-strategy |
||
.withDisplayName("Override schema change topic name") | ||
.withType(Type.STRING) | ||
.withWidth(Width.MEDIUM) | ||
.withImportance(ConfigDef.Importance.LOW) | ||
.withValidation(CommonConnectorConfig::validateTopicName) | ||
.withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx)."); | ||
|
||
public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen") | ||
.withDisplayName("Offset storage task key generation number") | ||
.withType(Type.INT) | ||
|
@@ -470,6 +543,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field, | |
GTID, | ||
VTGATE_HOST, | ||
VTGATE_PORT, | ||
VTGATE_JDBC_PORT, | ||
VTGATE_USER, | ||
VTGATE_PASSWORD, | ||
TABLET_TYPE, | ||
|
@@ -479,6 +553,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field, | |
GRPC_MAX_INBOUND_MESSAGE_SIZE, | ||
BINARY_HANDLING_MODE, | ||
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, | ||
OVERRIDE_SCHEMA_CHANGE_TOPIC, | ||
SCHEMA_NAME_ADJUSTMENT_MODE, | ||
OFFSET_STORAGE_PER_TASK, | ||
OFFSET_STORAGE_TASK_KEY_GEN, | ||
|
@@ -536,11 +611,16 @@ public static ConfigDef configDef() { | |
|
||
public VitessConnectorConfig(Configuration config) { | ||
super( | ||
VitessConnector.class, | ||
config, | ||
null, x -> x.schema() + "." + x.table(), | ||
Tables.TableFilter.fromPredicate(VitessConnectorConfig::isNotBuiltInTable), | ||
x -> x.schema() + "." + x.table(), | ||
true, | ||
-1, | ||
ColumnFilterMode.SCHEMA, | ||
true); | ||
false); | ||
|
||
getServiceRegistry().registerServiceProvider(new MySqlCharsetRegistryServiceProvider()); | ||
} | ||
|
||
@Override | ||
|
@@ -649,14 +729,38 @@ public int getVtgatePort() { | |
return getConfig().getInteger(VTGATE_PORT); | ||
} | ||
|
||
public int getVtgateJdbcPort() { | ||
return getConfig().getInteger(VTGATE_JDBC_PORT); | ||
} | ||
|
||
public String getVtgateUsername() { | ||
return getConfig().getString(VTGATE_USER); | ||
} | ||
|
||
public String getVtgateJdbcUsername() { | ||
String jdbcUsername = getConfig().getString(VTGATE_JDBC_USER); | ||
if (jdbcUsername != null) { | ||
return jdbcUsername; | ||
} | ||
else { | ||
return getConfig().getString(VTGATE_USER); | ||
} | ||
} | ||
|
||
public String getVtgatePassword() { | ||
return getConfig().getString(VTGATE_PASSWORD); | ||
} | ||
|
||
public String getVtgateJdbcPassword() { | ||
String jdbcPassword = getConfig().getString(VTGATE_JDBC_PASSWORD); | ||
if (jdbcPassword != null) { | ||
return jdbcPassword; | ||
} | ||
else { | ||
return getConfig().getString(VTGATE_PASSWORD); | ||
} | ||
} | ||
|
||
public String getTabletType() { | ||
return getConfig().getString(TABLET_TYPE); | ||
} | ||
|
@@ -754,6 +858,29 @@ public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, Schema | |
return new VitessHeartbeatImpl(getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(), getLogicalName(), schemaNameAdjuster); | ||
} | ||
|
||
/** | ||
* Checks whether the {@link TableId} refers to a built-in table. | ||
* | ||
* @param tableId the relational table identifier, should not be null | ||
* @return true if the reference refers to a built-in table | ||
*/ | ||
public static boolean isNotBuiltInTable(TableId tableId) { | ||
return !isBuiltInDatabase(tableId.catalog()); | ||
} | ||
|
||
/** | ||
* Check whether the specified database name is a built-in database. | ||
* | ||
* @param databaseName the database name to check | ||
* @return true if the database is a built-in database; false otherwise | ||
*/ | ||
public static boolean isBuiltInDatabase(String databaseName) { | ||
if (databaseName == null) { | ||
return false; | ||
} | ||
return BUILT_IN_DB_NAMES.contains(databaseName.toLowerCase()); | ||
} | ||
|
||
public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() { | ||
return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE), | ||
BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString()); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this dependency is not needed as it is a transitive one for
debezium-connector-mysql