-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-7698 Add ordered transaction metadata for Vitess connector #188
Conversation
I think it would be good to have an example of what's stored in offsetMap which gets persisted into Kafka offset topic. Especially on whether the transaction_id and rank field in offset map represents one shard or multiple shards. Also it would be a good idea to have an example of the message content (for the new transaction_id/rank fields) to the Kafka data topic. |
Thanks for the review @HenryCaiHaiying I added some examples in java doc comments. |
public class VitessOrderedTransactionContext extends TransactionContext { | ||
public static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch"; | ||
public static final String OFFSET_TRANSACTION_RANK = "transaction_rank"; | ||
protected String previousTransactionId = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the previousTransactionId here the vGtid which presents multiple shards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have updated the name, it is the full vgtid
private Map<String, Long> shardToEpoch = new HashMap<>(); | ||
private static final ObjectMapper MAPPER = new ObjectMapper(); | ||
|
||
public Long getEpoch(Long previousEpoch, String previousTransactionId, String transactionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the previousTransactionId and transactionId here represents one shard or multiple shards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If id represents one shard, maybe the variable name can just be called previousGtidString to avoid confusion.
Also this method seems can be a static
method, it doesn't use any instance variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the previousTransactionId and transactionId here represents one shard or multiple shards?
Updated variable name, it represents single shard gtid
Also this method seems can be a static method, it doesn't use any instance variables.
It uses the shardToEpoch instance variable, so it cannot be static. However, the rank provider method could be static so I made that one static.
private Map<String, Long> shardToEpoch = new HashMap<>(); | ||
private static final ObjectMapper MAPPER = new ObjectMapper(); | ||
|
||
public Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method doesn't use shardToEpoch
instance variables, can be a static method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I'll update that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to override io.debezium.connector.AbstractSourceInfo.sequence()
to align the connector with others?
My thinking is that we don't want this epoch/rank tracking on by default so it should not be in the source info. More reasoning mentioned here. |
Yes, I understand. My reasoning is that it would be present only if transactions are enabled. |
Are we sure we want the content of SourceInfo to be dependent on that transaction metadata setting? Seems not as modular. That setting only affects the transaction block or transaction messages so it keeps the modifications isolated. I'm not sure about having SourceInfo changing based on that setting, seems like an unintuitive side effect. For other connectors I see that use sequence, it doesn't depend on the setting. It contains data that is intrinsic to the transaction, not some derived stateful calculation. Additionally, if epoch/rank is present in the transaction block and in the sequence value that does mean duplication. |
@twthorn I am still not fully convinced on not using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@twthorn Thanks a lot. This PR is ready to be merged after the core PR is refactored on the top of the latest main.
…ard in shard to epoch map
Forse-pushed rebase on the latest main |
Changes
Implements transaction metadata order feature using new features added in debezium/debezium#5437 for customizing transaction metadata.
Adds the transaction epoch & rank fields. Tracks epoch in state for correct epoch recording. Computes rank for each GTID received.
How to Use
We will add this section/similar one to docs once we finalize things:
This field provides information about every event in the form of a composite of fields. With ordered transaction metadata enabled, there are two additional fields included:
Additional notes
Tests will not pass in Github until changes in this PR debezium/debezium#5437 are merged. However, I ran this locally with the snapshot version built on that PR and all itests passed.