-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-7698 Add transaction order metadata #187
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
/* | ||
* Copyright Debezium Authors. | ||
* | ||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.debezium.connector.vitess.transaction; | ||
|
||
interface EpochProvider { | ||
Long getEpoch(Long previousEpoch, String previousTransactionId, String transactionId); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright Debezium Authors. | ||
* | ||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.debezium.connector.vitess.transaction; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
|
||
class Gtid { | ||
|
||
public String getVersion() { | ||
return version; | ||
} | ||
|
||
private String version = ""; | ||
|
||
public Set<String> getHosts() { | ||
return hosts; | ||
} | ||
|
||
private Set<String> hosts = new HashSet(); | ||
|
||
public List<String> getSequenceValues() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to give an example of |
||
return sequenceValues; | ||
} | ||
|
||
private List<String> sequenceValues = new ArrayList(); | ||
|
||
private static final String PREFIX_LAST_CHAR = "/"; | ||
|
||
private static int getVersionEndIndex(String transactionId) { | ||
return transactionId.indexOf(PREFIX_LAST_CHAR); | ||
} | ||
|
||
private static String trimVersion(String transactionId) { | ||
int index = getVersionEndIndex(transactionId); | ||
if (index != -1) { | ||
return transactionId.substring(index + 1); | ||
} | ||
return transactionId; | ||
} | ||
|
||
private void initializeVersion(String transactionId) { | ||
int index = getVersionEndIndex(transactionId); | ||
if (index != -1) { | ||
this.version = transactionId.substring(0, index); | ||
} | ||
} | ||
|
||
Gtid(String transactionId) { | ||
initializeVersion(transactionId); | ||
parseGtid(transactionId); | ||
} | ||
|
||
private void parseGtid(String transactionId) { | ||
transactionId = trimVersion(transactionId); | ||
String[] transactions = transactionId.split(","); | ||
for (String transaction : transactions) { | ||
String[] parts = transaction.split(":"); | ||
String hostname = parts[0]; | ||
hosts.add(hostname); | ||
String maxSequenceValue = parts[1].split("-")[1]; | ||
sequenceValues.add(maxSequenceValue); | ||
} | ||
} | ||
|
||
public boolean isHostSetEqual(Gtid hosts) { | ||
return this.hosts.equals(hosts.hosts); | ||
} | ||
|
||
public boolean isHostSetSupersetOf(Gtid previousHosts) { | ||
return this.hosts.containsAll(previousHosts.hosts); | ||
} | ||
|
||
public boolean isHostSetSubsetOf(Gtid previousHosts) { | ||
return previousHosts.hosts.containsAll(this.hosts); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
/* | ||
* Copyright Debezium Authors. | ||
* | ||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.debezium.connector.vitess.transaction; | ||
|
||
import java.math.BigInteger; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
import io.debezium.pipeline.txmetadata.TransactionContext; | ||
import io.debezium.spi.schema.DataCollectionId; | ||
|
||
public class OrderedTransactionContext extends TransactionContext { | ||
|
||
protected static final String OFFSET_TRANSACTION_ID = "transaction_id"; | ||
protected static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch"; | ||
protected static final String OFFSET_TRANSACTION_RANK = "transaction_rank"; | ||
|
||
private static final String OFFSET_TABLE_COUNT_PREFIX = "transaction_data_collection_order_"; | ||
private static final int OFFSET_TABLE_COUNT_PREFIX_LENGTH = OFFSET_TABLE_COUNT_PREFIX.length(); | ||
private String transactionId = null; | ||
private final Map<String, Long> perTableEventCount = new HashMap(); | ||
private final Map<String, Long> viewPerTableEventCount; | ||
private long totalEventCount; | ||
private String previousTransactionId = null; | ||
private Long transactionEpoch; | ||
private BigInteger transactionRank; | ||
|
||
private EpochProvider epochProvider; | ||
private RankProvider rankProvider; | ||
|
||
public OrderedTransactionContext(EpochProvider epochProvider, RankProvider rankProvider) { | ||
this.epochProvider = epochProvider; | ||
this.rankProvider = rankProvider; | ||
this.viewPerTableEventCount = Collections.unmodifiableMap(this.perTableEventCount); | ||
this.totalEventCount = 0L; | ||
this.transactionEpoch = 0L; | ||
this.transactionRank = null; | ||
} | ||
|
||
private void reset() { | ||
this.transactionId = null; | ||
this.totalEventCount = 0L; | ||
this.perTableEventCount.clear(); | ||
this.transactionRank = null; | ||
} | ||
|
||
@Override | ||
public Map<String, Object> store(Map<String, Object> offset) { | ||
if (!Objects.isNull(this.transactionId)) { | ||
offset.put(OFFSET_TRANSACTION_ID, this.transactionId); | ||
} | ||
if (!Objects.isNull(this.transactionEpoch)) { | ||
offset.put(OFFSET_TRANSACTION_EPOCH, this.transactionEpoch); | ||
} | ||
if (!Objects.isNull(this.transactionRank)) { | ||
offset.put(OFFSET_TRANSACTION_RANK, this.transactionRank.toString()); | ||
} | ||
|
||
Iterator var3 = this.perTableEventCount.entrySet().iterator(); | ||
|
||
while (var3.hasNext()) { | ||
Map.Entry<String, Long> e = (Map.Entry) var3.next(); | ||
offset.put(OFFSET_TABLE_COUNT_PREFIX + e.getKey(), e.getValue()); | ||
} | ||
return offset; | ||
} | ||
|
||
public static OrderedTransactionContext load(Map<String, ?> offsets, EpochProvider epochProvider, RankProvider rankProvider) { | ||
OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); | ||
context.transactionId = (String) offsets.get(OFFSET_TRANSACTION_ID); | ||
context.previousTransactionId = (String) offsets.get(OFFSET_TRANSACTION_ID); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The offset_transaction_id stored in offset map is for a specific shard. A VGTID in the same offset map represents multiple shards. If the offset_transaction_id in the offset map represents shard1, but the new events coming from VSTREAM is for shard2, how do you do the comparison? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes forgot to add this part, we do need mapping of shard to epoch. |
||
|
||
context.transactionEpoch = (Long) offsets.get(OFFSET_TRANSACTION_EPOCH); | ||
String transactionRankString = (String) offsets.get(OFFSET_TRANSACTION_RANK); | ||
if (transactionRankString == null) { | ||
context.transactionRank = null; | ||
} | ||
else { | ||
context.transactionRank = new BigInteger(transactionRankString); | ||
} | ||
|
||
Iterator var3 = offsets.entrySet().iterator(); | ||
|
||
while (var3.hasNext()) { | ||
Map.Entry<String, Object> offset = (Map.Entry) var3.next(); | ||
if ((offset.getKey()).startsWith(OFFSET_TABLE_COUNT_PREFIX)) { | ||
String dataCollectionId = (offset.getKey()).substring(OFFSET_TABLE_COUNT_PREFIX_LENGTH); | ||
Long count = (Long) offset.getValue(); | ||
context.perTableEventCount.put(dataCollectionId, count); | ||
} | ||
} | ||
|
||
context.totalEventCount = context.perTableEventCount.values().stream().mapToLong((x) -> x).sum(); | ||
return context; | ||
} | ||
|
||
@Override | ||
public boolean isTransactionInProgress() { | ||
return !Objects.isNull(this.transactionId); | ||
} | ||
|
||
@Override | ||
public String getTransactionId() { | ||
return transactionId; | ||
} | ||
|
||
@Override | ||
public long getTotalEventCount() { | ||
return this.totalEventCount; | ||
} | ||
|
||
@Override | ||
public void beginTransaction(String txId) { | ||
this.previousTransactionId = this.transactionId; | ||
this.reset(); | ||
this.transactionId = txId; | ||
transactionEpoch = this.epochProvider.getEpoch(this.transactionEpoch, previousTransactionId, txId); | ||
transactionRank = this.rankProvider.getRank(txId); | ||
} | ||
|
||
@Override | ||
public void endTransaction() { | ||
this.reset(); | ||
} | ||
|
||
@Override | ||
public long event(DataCollectionId source) { | ||
++this.totalEventCount; | ||
String sourceName = source.toString(); | ||
long dataCollectionEventOrder = (Long) this.perTableEventCount.getOrDefault(sourceName, 0L) + 1L; | ||
this.perTableEventCount.put(sourceName, dataCollectionEventOrder); | ||
return dataCollectionEventOrder; | ||
} | ||
|
||
@Override | ||
public Map<String, Long> getPerTableEventCount() { | ||
return this.viewPerTableEventCount; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "TransactionContext [" + | ||
"currentTransactionId=" + this.transactionId + | ||
", perTableEventCount=" + this.perTableEventCount + | ||
", totalEventCount=" + this.totalEventCount + | ||
", transactionEpoch=" + this.transactionEpoch + | ||
", transactionRank=" + this.transactionRank + | ||
"]"; | ||
} | ||
|
||
public Long getTransactionEpoch() { | ||
return transactionEpoch; | ||
} | ||
|
||
public BigInteger getTransactionRank() { | ||
return transactionRank; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provided
-->provide