-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-7698 Add transaction order metadata #187
Conversation
interface RankProvider { | ||
BigInteger getRank(String transactionId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking aloud; what about expanding this to allow arbitrary attributes somehow? I'm just not sure that only a transaction Id would be sufficient. For example, in Informix, if you don't have concurrent transactions, the underlying LSN doesn't change. Just curious if passing a parameter object here would be more beneficial for more corner cases where a single String
may not be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Responded in Zulip overall we could use the source info struct as the single parameter for this interface, which would include all needed info to calculate rank.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is currently stored in the Kafka offset topic? The VGTID or GTID?, where do you store the epoch information in Kafka offset topic?
.withWidth(Width.SHORT) | ||
.withImportance(ConfigDef.Importance.HIGH) | ||
.withDescription( | ||
"Whether to provided ordered metadata on transactions"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provided
--> provide
|
||
private Set<String> hosts = new HashSet(); | ||
|
||
public List<String> getSequenceValues() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to give an example of transactionId
and the expected value of version
, sequenceValues
public static OrderedTransactionContext load(Map<String, ?> offsets, EpochProvider epochProvider, RankProvider rankProvider) { | ||
OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); | ||
context.transactionId = (String) offsets.get(OFFSET_TRANSACTION_ID); | ||
context.previousTransactionId = (String) offsets.get(OFFSET_TRANSACTION_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transactionId
and previousTransactionId
is using the same key in the offsets
map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The offset_transaction_id stored in offset map is for a specific shard. A VGTID in the same offset map represents multiple shards. If the offset_transaction_id in the offset map represents shard1, but the new events coming from VSTREAM is for shard2, how do you do the comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes forgot to add this part, we do need mapping of shard to epoch.
Two initial comments
|
Transaction rank could be added here. Epoch couldn't because it needs state. However, I don't think we want to provide transaction rank for all events going forward (regardless of ordered transaction metadata setting). It isn't useful without epoch data. For connectors that do have this, e.g., postgres, it is metdata that is tied to each event, and not calcualted/derived, e.g., lsn.
I agree with this, we want as general a solution as possible. I am working on implementation in debezium core that will allow for any values to be augmented to the transaction context. It could be tuple/array/etc., entirely up to the connector that implements it. The core code won't require implementing connectors to use rank/epoch. |
Motivation
Provide transaction order metadata to establish ordering between any two operations for the same primary key, regardless of order consumed by the downstream application.
See Zulip & Jira for more info on motivation.
Implementation
The key new class introduced is OrderedTransactionContext. It is similar to TransactionContext but also adds two new fields, epoch & rank. Rank is a monotonically increasing numerical value that is used for comparing which transaction is more recent. Each database can have its own function for this, we given an example for Vitess. Epoch is a value that is incremented whenever this monotonically increasing property of rank would be violated (e.g., in Vitess, this would be shrinking the host set of the GTID, or a MySQL version upgrade). In order to prevent wrongfully incrementing the epoch, it must be stored in offsets state. We store the transaction rank alongside since they are semantically related (although not strictly necessary for transaction rank to be in offsets, similar to transaction order key).
Additional Details
This demonstrates how this transaction order metadata is possible. Note: since this is to be a self-contained PR in the vitess connector repo, there are some workarounds that we do, that would be different if we want to more completely adopt this in the main Debezium repo. These are as follows: