Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7698 Add ordered transaction metadata for Vitess connector #188

Merged
merged 15 commits into from
Apr 19, 2024

Conversation

twthorn
Copy link
Contributor

@twthorn twthorn commented Mar 28, 2024

Changes

Implements transaction metadata order feature using new features added in debezium/debezium#5437 for customizing transaction metadata.

Adds the transaction epoch & rank fields. Tracks epoch in state for correct epoch recording. Computes rank for each GTID received.

How to Use

We will add this section/similar one to docs once we finalize things:

When the following two configs are set to the listed values, the data message Envelope is also enriched with a new transaction field.

transaction.contex: VitessOrderedTransactionContext
transaction.struct.maker: VitessOrderedTransactionStructMaker

This field provides information about every event in the form of a composite of fields. With ordered transaction metadata enabled, there are two additional fields included:

  • transaction_epoch - non-decreasing value representing the "epoch" that its transaction rank belongs to
  • transaction_rank - non-decreasing value (within an epoch) that represents the order of the transaction

Additionally, a third field (already included in standard transaction metadata) is also relevant for ordering:

  • total_order absolute position of the event among all events generated by the transaction

For how these fields can be used to establish an ordering of events. Consider the following example.
There are two data change events that occurred in the same shard and for the same primary key. However, kafka experienced a re-partition event so the consumer order of the two events cannot be trusted. A downstream application can determine which event to apply (the newer event) and which to discard with the following logic:

  1. If transaction_epoch is not equal, return the event with the grater epoch. Otherwise, continue
  2. If transaction_rank is not equal, return the event with the greater rank. Otherwise, continue:
  3. Return the event with a greater total_order

If we reach step 3, the two events are in the same transaction. The total_order field represents the order of events within a transaction, so a greater value will be the newer event.

Following is an example of a data change event with ordered transaction metadata:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": 1637988245467,
  "ts_us": 1637988245467841,
  "ts_ns": 1637988245467841698,
  "transaction": {
    "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
    "total_order": 1,
    "data_collection_order": 1,
    "transaction_rank": 68,
    "transaction_epoch": 0
  }
}

Additional notes

Tests will not pass in Github until changes in this PR debezium/debezium#5437 are merged. However, I ran this locally with the snapshot version built on that PR and all itests passed.

@HenryCaiHaiying
Copy link
Contributor

I think it would be good to have an example of what's stored in offsetMap which gets persisted into Kafka offset topic. Especially on whether the transaction_id and rank field in offset map represents one shard or multiple shards.

Also it would be a good idea to have an example of the message content (for the new transaction_id/rank fields) to the Kafka data topic.

@twthorn
Copy link
Contributor Author

twthorn commented Apr 2, 2024

Thanks for the review @HenryCaiHaiying I added some examples in java doc comments.

public class VitessOrderedTransactionContext extends TransactionContext {
public static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch";
public static final String OFFSET_TRANSACTION_RANK = "transaction_rank";
protected String previousTransactionId = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the previousTransactionId here the vGtid which presents multiple shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have updated the name, it is the full vgtid

private Map<String, Long> shardToEpoch = new HashMap<>();
private static final ObjectMapper MAPPER = new ObjectMapper();

public Long getEpoch(Long previousEpoch, String previousTransactionId, String transactionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the previousTransactionId and transactionId here represents one shard or multiple shards?

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If id represents one shard, maybe the variable name can just be called previousGtidString to avoid confusion.

Also this method seems can be a static method, it doesn't use any instance variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the previousTransactionId and transactionId here represents one shard or multiple shards?
Updated variable name, it represents single shard gtid

Also this method seems can be a static method, it doesn't use any instance variables.

It uses the shardToEpoch instance variable, so it cannot be static. However, the rank provider method could be static so I made that one static.

private Map<String, Long> shardToEpoch = new HashMap<>();
private static final ObjectMapper MAPPER = new ObjectMapper();

public Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't use shardToEpoch instance variables, can be a static method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'll update that one.

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Contributor

@jpechane jpechane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to override io.debezium.connector.AbstractSourceInfo.sequence() to align the connector with others?

src/main/java/io/debezium/connector/vitess/Vgtid.java Outdated Show resolved Hide resolved
@twthorn
Copy link
Contributor Author

twthorn commented Apr 11, 2024

Would it be possible to override io.debezium.connector.AbstractSourceInfo.sequence() to align the connector with others?

My thinking is that we don't want this epoch/rank tracking on by default so it should not be in the source info. More reasoning mentioned here.

@jpechane
Copy link
Contributor

Would it be possible to override io.debezium.connector.AbstractSourceInfo.sequence() to align the connector with others?

My thinking is that we don't want this epoch/rank tracking on by default so it should not be in the source info. More reasoning mentioned here.

Yes, I understand. My reasoning is that it would be present only if transactions are enabled.

@twthorn
Copy link
Contributor Author

twthorn commented Apr 11, 2024

Would it be possible to override io.debezium.connector.AbstractSourceInfo.sequence() to align the connector with others?

My thinking is that we don't want this epoch/rank tracking on by default so it should not be in the source info. More reasoning mentioned here.

Yes, I understand. My reasoning is that it would be present only if transactions are enabled.

Are we sure we want the content of SourceInfo to be dependent on that transaction metadata setting? Seems not as modular. That setting only affects the transaction block or transaction messages so it keeps the modifications isolated. I'm not sure about having SourceInfo changing based on that setting, seems like an unintuitive side effect.

For other connectors I see that use sequence, it doesn't depend on the setting. It contains data that is intrinsic to the transaction, not some derived stateful calculation.

Additionally, if epoch/rank is present in the transaction block and in the sequence value that does mean duplication.

@twthorn twthorn requested a review from jpechane April 12, 2024 14:00
@jpechane
Copy link
Contributor

@twthorn I am still not fully convinced on not using sequence in source info due to break of consistency with other connectors and I'd be willing to accept dependency on transaction metadata setting as this can be properly documented. Also I'd have no issue in duplication of the values in this case. Still let's keep it as is and we'll intorduce it later if there will be real need.

Copy link
Contributor

@jpechane jpechane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twthorn Thanks a lot. This PR is ready to be merged after the core PR is refactored on the top of the latest main.

@jpechane
Copy link
Contributor

Forse-pushed rebase on the latest main

@jpechane jpechane merged commit 31fbfef into debezium:main Apr 19, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants