Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7698 Add transaction order metadata #187

Closed
wants to merge 1 commit into from

Conversation

twthorn
Copy link
Contributor

@twthorn twthorn commented Mar 21, 2024

Motivation

Provide transaction order metadata to establish ordering between any two operations for the same primary key, regardless of order consumed by the downstream application.

See Zulip & Jira for more info on motivation.

Implementation

The key new class introduced is OrderedTransactionContext. It is similar to TransactionContext but also adds two new fields, epoch & rank. Rank is a monotonically increasing numerical value that is used for comparing which transaction is more recent. Each database can have its own function for this, we given an example for Vitess. Epoch is a value that is incremented whenever this monotonically increasing property of rank would be violated (e.g., in Vitess, this would be shrinking the host set of the GTID, or a MySQL version upgrade). In order to prevent wrongfully incrementing the epoch, it must be stored in offsets state. We store the transaction rank alongside since they are semantically related (although not strictly necessary for transaction rank to be in offsets, similar to transaction order key).

Additional Details

This demonstrates how this transaction order metadata is possible. Note: since this is to be a self-contained PR in the vitess connector repo, there are some workarounds that we do, that would be different if we want to more completely adopt this in the main Debezium repo. These are as follows:

  1. OrderedTransactionContext subclasses TransactionContext - we do this just to plug in nicely with Debezium Core. What we should really do is either change TransactionContext to handle ordered metadata, or define an interface and have two classes one basic and one with ordered metadata.
  2. EpochProvider/RankProvider interfaces - these should be in debezium-core's transaction package, and could be implemented by all connectors
  3. The struct of a data change event is missing the epoch & rank information. This is needed in order for downstream applications to automatically handle repartition events. In order to add this, we must modify this method (or create a way to modify the behavior when an ordered transaction context is presented) to include the new fields (we omit this for now since we are only making changes in the Vitess Connector repo).
  4. Vgtid to per-shard gtid - Previously our transaction ID for metadata and other purposes was the full Vgtid (all shards & gtid sets). Now, with this change, in order to plug into the TransactionMonitor class & TransactionContext, we need to be able to deduce the rank & epoch solely from the transaction ID value. This means we cannot pass in Vgtid, because we would not know which shard to look at for the gtid set. Therefore we must pass in just the GTID for the desired shard. Note: if we change debezium-core we could avoid this (add some more functionalities for receiving shard info), but as of right now it works, and may be the preferred method (since we want debezium-core to remain generic (rather than customize to handle this gtid filtering for shard case) and the offsets we store still have vgtid which is needed to resuming a VStream).
  5. More tests - for now there are some tests/assertions we can't do yet. Specifically for (3) once that is fixed we'd want to add assertions on the source info. Additionally, we should also have an integration test on the epoch increase case (we can manipulate the stored offsets to trigger epoch increase event like shrinking host set).

@twthorn twthorn marked this pull request as draft March 21, 2024 21:27
Comment on lines +10 to +12
interface RankProvider {
BigInteger getRank(String transactionId);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking aloud; what about expanding this to allow arbitrary attributes somehow? I'm just not sure that only a transaction Id would be sufficient. For example, in Informix, if you don't have concurrent transactions, the underlying LSN doesn't change. Just curious if passing a parameter object here would be more beneficial for more corner cases where a single String may not be sufficient.

Copy link
Contributor Author

@twthorn twthorn Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded in Zulip overall we could use the source info struct as the single parameter for this interface, which would include all needed info to calculate rank.

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is currently stored in the Kafka offset topic? The VGTID or GTID?, where do you store the epoch information in Kafka offset topic?

.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription(
"Whether to provided ordered metadata on transactions");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provided --> provide


private Set<String> hosts = new HashSet();

public List<String> getSequenceValues() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to give an example of transactionId and the expected value of version, sequenceValues

public static OrderedTransactionContext load(Map<String, ?> offsets, EpochProvider epochProvider, RankProvider rankProvider) {
OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider);
context.transactionId = (String) offsets.get(OFFSET_TRANSACTION_ID);
context.previousTransactionId = (String) offsets.get(OFFSET_TRANSACTION_ID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transactionId and previousTransactionId is using the same key in the offsets map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offset_transaction_id stored in offset map is for a specific shard. A VGTID in the same offset map represents multiple shards. If the offset_transaction_id in the offset map represents shard1, but the new events coming from VSTREAM is for shard2, how do you do the comparison?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes forgot to add this part, we do need mapping of shard to epoch.

@jpechane
Copy link
Contributor

Two initial comments

  • Should not io.debezium.connector.vitess.SourceInfo implement io.debezium.connector.AbstractSourceInfo.sequence() in this PR?
  • I wonder if we need rank and epoch provider. If we are oing to have this really generic then maybe just a tuple of values should be used. And it is up to each connector to implement 1, 2, 3 or arbitrary values needed. WDYT?

@twthorn
Copy link
Contributor Author

twthorn commented Mar 26, 2024

Should not io.debezium.connector.vitess.SourceInfo implement io.debezium.connector.AbstractSourceInfo.sequence() in this PR?

Transaction rank could be added here. Epoch couldn't because it needs state. However, I don't think we want to provide transaction rank for all events going forward (regardless of ordered transaction metadata setting). It isn't useful without epoch data.

For connectors that do have this, e.g., postgres, it is metdata that is tied to each event, and not calcualted/derived, e.g., lsn.

I wonder if we need rank and epoch provider. If we are oing to have this really generic then maybe just a tuple of values should be used. And it is up to each connector to implement 1, 2, 3 or arbitrary values needed. WDYT?

I agree with this, we want as general a solution as possible. I am working on implementation in debezium core that will allow for any values to be augmented to the transaction context. It could be tuple/array/etc., entirely up to the connector that implements it. The core code won't require implementing connectors to use rank/epoch.

@twthorn twthorn closed this Mar 28, 2024
@twthorn twthorn deleted the DBZ-7698 branch March 28, 2024 20:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants