Skip to content

Commit

Permalink
DBZ-7698 Add transaction order metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Mar 21, 2024
1 parent fc07915 commit 8b39a0a
Show file tree
Hide file tree
Showing 18 changed files with 778 additions and 17 deletions.
9 changes: 9 additions & 0 deletions src/main/java/io/debezium/connector/vitess/Vgtid.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public List<ShardGtid> getShardGtids() {
return shardGtids;
}

public ShardGtid getShardGtid(String shard) {
for (ShardGtid shardGtid : shardGtids) {
if (shardGtid.shard.equals(shard)) {
return shardGtid;
}
}
throw new RuntimeException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString());
}

public boolean isSingleShard() {
return rawVgtid.getShardGtidsCount() == 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'precise' represents values as precise (Java's 'BigDecimal') values;"
+ "'long' represents values using Java's 'long', which may not offer the precision but will be far easier to use in consumers.");

public static final Field PROVIDE_ORDERED_TRANSACTION_METADATA = Field.create("provide.ordered.transaction.metadata")
.withDisplayName("Provide ordered transaction meatadata")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription(
"Whether to provided ordered metadata on transactions");

public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(VitessSourceInfoStructMaker.class.getName());

Expand Down Expand Up @@ -417,7 +426,7 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.events(
INCLUDE_UNKNOWN_DATATYPES,
SOURCE_INFO_STRUCT_MAKER)
.connector(SNAPSHOT_MODE, BIGINT_UNSIGNED_HANDLING_MODE)
.connector(SNAPSHOT_MODE, BIGINT_UNSIGNED_HANDLING_MODE, PROVIDE_ORDERED_TRANSACTION_METADATA)
.excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST)
.create();

Expand Down Expand Up @@ -617,4 +626,8 @@ public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() {
return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE),
BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString());
}

public boolean shouldProvideOrderedTransactionMetadata() {
return Boolean.parseBoolean(getConfig().getString(PROVIDE_ORDERED_TRANSACTION_METADATA));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ public String getTransactionId(
}

final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
// Use the entire VGTID as transaction id
return sourceInfo.getString(SourceInfo.VGTID_KEY);
}
String shard = sourceInfo.getString(SourceInfo.SHARD_KEY);

String jsonString = sourceInfo.getString(SourceInfo.VGTID_KEY);
Vgtid vgtid = Vgtid.of(jsonString);
return vgtid.getShardGtid(shard).getGtid();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.transaction.OrderedTransactionContext;
import io.debezium.connector.vitess.transaction.VitessEpochProvider;
import io.debezium.connector.vitess.transaction.VitessRankProvider;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
Expand Down Expand Up @@ -50,8 +53,16 @@ public static VitessOffsetContext initialContext(
VitessConnectorConfig connectorConfig, Clock clock) {
LOGGER.info("No previous offset exists. Use default VGTID.");
final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig);
// use the other transaction context
TransactionContext transactionContext;
if (connectorConfig.shouldProvideOrderedTransactionMetadata()) {
transactionContext = new OrderedTransactionContext(new VitessEpochProvider(), new VitessRankProvider());
}
else {
transactionContext = new TransactionContext();
}
return new VitessOffsetContext(
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), new TransactionContext());
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext);
}

/**
Expand Down Expand Up @@ -144,11 +155,18 @@ public Loader(VitessConnectorConfig connectorConfig) {
@Override
public VitessOffsetContext load(Map<String, ?> offset) {
final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY);
TransactionContext transactionContext;
if (connectorConfig.shouldProvideOrderedTransactionMetadata()) {
transactionContext = OrderedTransactionContext.load(offset, new VitessEpochProvider(), new VitessRankProvider());
}
else {
transactionContext = TransactionContext.load(offset);
}
return new VitessOffsetContext(
connectorConfig,
Vgtid.of(vgtid),
null,
TransactionContext.load(offset));
transactionContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor pro
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid();
}
processor.process(
new DdlMessage(transactionId, eventTimestamp), newVgtid, false);
Expand All @@ -101,7 +101,7 @@ private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor p
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid();
}
processor.process(
new OtherMessage(transactionId, eventTimestamp), newVgtid, false);
Expand All @@ -112,7 +112,7 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id.
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid();
}
// Transaction ID must not be null in TransactionalMessage.
if (this.transactionId == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess.transaction;

public interface EpochProvider {
public Long getEpoch(Long previousEpoch, String previousTransactionId, String transactionId);

}
82 changes: 82 additions & 0 deletions src/main/java/io/debezium/connector/vitess/transaction/Gtid.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.debezium.connector.vitess.transaction;

/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

class Gtid {

public String getVersion() {
return version;
}

private String version = "";

public Set<String> getHosts() {
return hosts;
}

private Set<String> hosts = new HashSet();

public List<String> getSequenceValues() {
return sequenceValues;
}

private List<String> sequenceValues = new ArrayList();

private static final String PREFIX_LAST_CHAR = "/";

private static int getVersionEndIndex(String transactionId) {
return transactionId.indexOf(PREFIX_LAST_CHAR);
}

private static String trimVersion(String transactionId) {
int index = getVersionEndIndex(transactionId);
if (index != -1) {
return transactionId.substring(index + 1);
}
return transactionId;
}

private void initializeVersion(String transactionId) {
int index = getVersionEndIndex(transactionId);
if (index != -1) {
this.version = transactionId.substring(0, index);
}
}

public Gtid(String transactionId) {
initializeVersion(transactionId);
parseGtid(transactionId);
}

private void parseGtid(String transactionId) {
transactionId = trimVersion(transactionId);
String[] transactions = transactionId.split(",");
for (String transaction : transactions) {
String[] parts = transaction.split(":");
String hostname = parts[0];
hosts.add(hostname);
String maxSequenceValue = parts[1].split("-")[1];
sequenceValues.add(maxSequenceValue);
}
}

public boolean isHostSetEqual(Gtid hosts) {
return this.hosts.equals(hosts.hosts);
}

public boolean isHostSetSupersetOf(Gtid previousHosts) {
return this.hosts.containsAll(previousHosts.hosts);
}

public boolean isHostSetSubsetOf(Gtid previousHosts) {
return previousHosts.hosts.containsAll(this.hosts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess.transaction;

import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.spi.schema.DataCollectionId;

public class OrderedTransactionContext extends TransactionContext {

protected static final String OFFSET_TRANSACTION_ID = "transaction_id";
protected static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch";
protected static final String OFFSET_TRANSACTION_RANK = "transaction_rank";

private static final String OFFSET_TABLE_COUNT_PREFIX = "transaction_data_collection_order_";
private static final int OFFSET_TABLE_COUNT_PREFIX_LENGTH = OFFSET_TABLE_COUNT_PREFIX.length();
private String transactionId = null;
private final Map<String, Long> perTableEventCount = new HashMap();
private final Map<String, Long> viewPerTableEventCount;
private long totalEventCount;
private String previousTransactionId = null;
private Long transactionEpoch;
private BigInteger transactionRank;

private EpochProvider epochProvider;
private RankProvider rankProvider;

public OrderedTransactionContext(EpochProvider epochProvider, RankProvider rankProvider) {
this.epochProvider = epochProvider;
this.rankProvider = rankProvider;
this.viewPerTableEventCount = Collections.unmodifiableMap(this.perTableEventCount);
this.totalEventCount = 0L;
this.transactionEpoch = 0L;
this.transactionRank = null;
}

private void reset() {
this.transactionId = null;
this.totalEventCount = 0L;
this.perTableEventCount.clear();
this.transactionRank = null;
}

@Override
public Map<String, Object> store(Map<String, Object> offset) {
if (!Objects.isNull(this.transactionId)) {
offset.put(OFFSET_TRANSACTION_ID, this.transactionId);
}
if (!Objects.isNull(this.transactionEpoch)) {
offset.put(OFFSET_TRANSACTION_EPOCH, this.transactionEpoch);
}
if (!Objects.isNull(this.transactionRank)) {
offset.put(OFFSET_TRANSACTION_RANK, this.transactionRank.toString());
}

Iterator var3 = this.perTableEventCount.entrySet().iterator();

while (var3.hasNext()) {
Map.Entry<String, Long> e = (Map.Entry) var3.next();
offset.put(OFFSET_TABLE_COUNT_PREFIX + e.getKey(), e.getValue());
}
return offset;
}

public static OrderedTransactionContext load(Map<String, ?> offsets, EpochProvider epochProvider, RankProvider rankProvider) {
OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider);
context.transactionId = (String) offsets.get(OFFSET_TRANSACTION_ID);
context.previousTransactionId = (String) offsets.get(OFFSET_TRANSACTION_ID);

context.transactionEpoch = (Long) offsets.get(OFFSET_TRANSACTION_EPOCH);
String transactionRankString = (String) offsets.get(OFFSET_TRANSACTION_RANK);
if (transactionRankString == null) {
context.transactionRank = null;
}
else {
context.transactionRank = new BigInteger(transactionRankString);
}

Iterator var3 = offsets.entrySet().iterator();

while (var3.hasNext()) {
Map.Entry<String, Object> offset = (Map.Entry) var3.next();
if ((offset.getKey()).startsWith(OFFSET_TABLE_COUNT_PREFIX)) {
String dataCollectionId = (offset.getKey()).substring(OFFSET_TABLE_COUNT_PREFIX_LENGTH);
Long count = (Long) offset.getValue();
context.perTableEventCount.put(dataCollectionId, count);
}
}

context.totalEventCount = context.perTableEventCount.values().stream().mapToLong((x) -> x).sum();
return context;
}

@Override
public boolean isTransactionInProgress() {
return !Objects.isNull(this.transactionId);
}

@Override
public String getTransactionId() {
return transactionId;
}

@Override
public long getTotalEventCount() {
return this.totalEventCount;
}

@Override
public void beginTransaction(String txId) {
this.previousTransactionId = this.transactionId;
this.reset();
this.transactionId = txId;
transactionEpoch = this.epochProvider.getEpoch(this.transactionEpoch, previousTransactionId, txId);
transactionRank = this.rankProvider.getRank(txId);
}

@Override
public void endTransaction() {
this.reset();
}

@Override
public long event(DataCollectionId source) {
++this.totalEventCount;
String sourceName = source.toString();
long dataCollectionEventOrder = (Long) this.perTableEventCount.getOrDefault(sourceName, 0L) + 1L;
this.perTableEventCount.put(sourceName, dataCollectionEventOrder);
return dataCollectionEventOrder;
}

@Override
public Map<String, Long> getPerTableEventCount() {
return this.viewPerTableEventCount;
}

@Override
public String toString() {
return "TransactionContext [" +
"currentTransactionId=" + this.transactionId +
", perTableEventCount=" + this.perTableEventCount +
", totalEventCount=" + this.totalEventCount +
", transactionEpoch=" + this.transactionEpoch +
", transactionRank=" + this.transactionRank +
"]";
}

public Long getTransactionEpoch() {
return transactionEpoch;
}

public BigInteger getTransactionRank() {
return transactionRank;
}
}
Loading

0 comments on commit 8b39a0a

Please sign in to comment.