Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7698 Add transaction order metadata #187

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/main/java/io/debezium/connector/vitess/Vgtid.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public List<ShardGtid> getShardGtids() {
return shardGtids;
}

public ShardGtid getShardGtid(String shard) {
for (ShardGtid shardGtid : shardGtids) {
if (shardGtid.shard.equals(shard)) {
return shardGtid;
}
}
throw new RuntimeException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString());
}

public boolean isSingleShard() {
return rawVgtid.getShardGtidsCount() == 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'precise' represents values as precise (Java's 'BigDecimal') values;"
+ "'long' represents values using Java's 'long', which may not offer the precision but will be far easier to use in consumers.");

public static final Field PROVIDE_ORDERED_TRANSACTION_METADATA = Field.create("provide.ordered.transaction.metadata")
.withDisplayName("Provide ordered transaction meatadata")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription(
"Whether to provided ordered metadata on transactions");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provided --> provide


public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(VitessSourceInfoStructMaker.class.getName());

Expand Down Expand Up @@ -417,7 +426,7 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.events(
INCLUDE_UNKNOWN_DATATYPES,
SOURCE_INFO_STRUCT_MAKER)
.connector(SNAPSHOT_MODE, BIGINT_UNSIGNED_HANDLING_MODE)
.connector(SNAPSHOT_MODE, BIGINT_UNSIGNED_HANDLING_MODE, PROVIDE_ORDERED_TRANSACTION_METADATA)
.excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST)
.create();

Expand Down Expand Up @@ -617,4 +626,8 @@ public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() {
return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE),
BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString());
}

public boolean shouldProvideOrderedTransactionMetadata() {
return Boolean.parseBoolean(getConfig().getString(PROVIDE_ORDERED_TRANSACTION_METADATA));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ public String getTransactionId(
}

final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
// Use the entire VGTID as transaction id
return sourceInfo.getString(SourceInfo.VGTID_KEY);
}
String shard = sourceInfo.getString(SourceInfo.SHARD_KEY);

String jsonString = sourceInfo.getString(SourceInfo.VGTID_KEY);
Vgtid vgtid = Vgtid.of(jsonString);
return vgtid.getShardGtid(shard).getGtid();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.transaction.OrderedTransactionContext;
import io.debezium.connector.vitess.transaction.VitessEpochProvider;
import io.debezium.connector.vitess.transaction.VitessRankProvider;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
Expand Down Expand Up @@ -50,8 +53,16 @@ public static VitessOffsetContext initialContext(
VitessConnectorConfig connectorConfig, Clock clock) {
LOGGER.info("No previous offset exists. Use default VGTID.");
final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig);
// use the other transaction context
TransactionContext transactionContext;
if (connectorConfig.shouldProvideOrderedTransactionMetadata()) {
transactionContext = new OrderedTransactionContext(new VitessEpochProvider(), new VitessRankProvider());
}
else {
transactionContext = new TransactionContext();
}
return new VitessOffsetContext(
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), new TransactionContext());
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext);
}

/**
Expand Down Expand Up @@ -144,11 +155,18 @@ public Loader(VitessConnectorConfig connectorConfig) {
@Override
public VitessOffsetContext load(Map<String, ?> offset) {
final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY);
TransactionContext transactionContext;
if (connectorConfig.shouldProvideOrderedTransactionMetadata()) {
transactionContext = OrderedTransactionContext.load(offset, new VitessEpochProvider(), new VitessRankProvider());
}
else {
transactionContext = TransactionContext.load(offset);
}
return new VitessOffsetContext(
connectorConfig,
Vgtid.of(vgtid),
null,
TransactionContext.load(offset));
transactionContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor pro
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid();
}
processor.process(
new DdlMessage(transactionId, eventTimestamp), newVgtid, false);
Expand All @@ -101,7 +101,7 @@ private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor p
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid();
}
processor.process(
new OtherMessage(transactionId, eventTimestamp), newVgtid, false);
Expand All @@ -112,7 +112,7 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id.
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid();
}
// Transaction ID must not be null in TransactionalMessage.
if (this.transactionId == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess.transaction;

interface EpochProvider {
Long getEpoch(Long previousEpoch, String previousTransactionId, String transactionId);

}
82 changes: 82 additions & 0 deletions src/main/java/io/debezium/connector/vitess/transaction/Gtid.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess.transaction;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

class Gtid {

public String getVersion() {
return version;
}

private String version = "";

public Set<String> getHosts() {
return hosts;
}

private Set<String> hosts = new HashSet();

public List<String> getSequenceValues() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to give an example of transactionId and the expected value of version, sequenceValues

return sequenceValues;
}

private List<String> sequenceValues = new ArrayList();

private static final String PREFIX_LAST_CHAR = "/";

private static int getVersionEndIndex(String transactionId) {
return transactionId.indexOf(PREFIX_LAST_CHAR);
}

private static String trimVersion(String transactionId) {
int index = getVersionEndIndex(transactionId);
if (index != -1) {
return transactionId.substring(index + 1);
}
return transactionId;
}

private void initializeVersion(String transactionId) {
int index = getVersionEndIndex(transactionId);
if (index != -1) {
this.version = transactionId.substring(0, index);
}
}

Gtid(String transactionId) {
initializeVersion(transactionId);
parseGtid(transactionId);
}

private void parseGtid(String transactionId) {
transactionId = trimVersion(transactionId);
String[] transactions = transactionId.split(",");
for (String transaction : transactions) {
String[] parts = transaction.split(":");
String hostname = parts[0];
hosts.add(hostname);
String maxSequenceValue = parts[1].split("-")[1];
sequenceValues.add(maxSequenceValue);
}
}

public boolean isHostSetEqual(Gtid hosts) {
return this.hosts.equals(hosts.hosts);
}

public boolean isHostSetSupersetOf(Gtid previousHosts) {
return this.hosts.containsAll(previousHosts.hosts);
}

public boolean isHostSetSubsetOf(Gtid previousHosts) {
return previousHosts.hosts.containsAll(this.hosts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess.transaction;

import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.spi.schema.DataCollectionId;

public class OrderedTransactionContext extends TransactionContext {

protected static final String OFFSET_TRANSACTION_ID = "transaction_id";
protected static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch";
protected static final String OFFSET_TRANSACTION_RANK = "transaction_rank";

private static final String OFFSET_TABLE_COUNT_PREFIX = "transaction_data_collection_order_";
private static final int OFFSET_TABLE_COUNT_PREFIX_LENGTH = OFFSET_TABLE_COUNT_PREFIX.length();
private String transactionId = null;
private final Map<String, Long> perTableEventCount = new HashMap();
private final Map<String, Long> viewPerTableEventCount;
private long totalEventCount;
private String previousTransactionId = null;
private Long transactionEpoch;
private BigInteger transactionRank;

private EpochProvider epochProvider;
private RankProvider rankProvider;

public OrderedTransactionContext(EpochProvider epochProvider, RankProvider rankProvider) {
this.epochProvider = epochProvider;
this.rankProvider = rankProvider;
this.viewPerTableEventCount = Collections.unmodifiableMap(this.perTableEventCount);
this.totalEventCount = 0L;
this.transactionEpoch = 0L;
this.transactionRank = null;
}

private void reset() {
this.transactionId = null;
this.totalEventCount = 0L;
this.perTableEventCount.clear();
this.transactionRank = null;
}

@Override
public Map<String, Object> store(Map<String, Object> offset) {
if (!Objects.isNull(this.transactionId)) {
offset.put(OFFSET_TRANSACTION_ID, this.transactionId);
}
if (!Objects.isNull(this.transactionEpoch)) {
offset.put(OFFSET_TRANSACTION_EPOCH, this.transactionEpoch);
}
if (!Objects.isNull(this.transactionRank)) {
offset.put(OFFSET_TRANSACTION_RANK, this.transactionRank.toString());
}

Iterator var3 = this.perTableEventCount.entrySet().iterator();

while (var3.hasNext()) {
Map.Entry<String, Long> e = (Map.Entry) var3.next();
offset.put(OFFSET_TABLE_COUNT_PREFIX + e.getKey(), e.getValue());
}
return offset;
}

public static OrderedTransactionContext load(Map<String, ?> offsets, EpochProvider epochProvider, RankProvider rankProvider) {
OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider);
context.transactionId = (String) offsets.get(OFFSET_TRANSACTION_ID);
context.previousTransactionId = (String) offsets.get(OFFSET_TRANSACTION_ID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transactionId and previousTransactionId is using the same key in the offsets map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offset_transaction_id stored in offset map is for a specific shard. A VGTID in the same offset map represents multiple shards. If the offset_transaction_id in the offset map represents shard1, but the new events coming from VSTREAM is for shard2, how do you do the comparison?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes forgot to add this part, we do need mapping of shard to epoch.


context.transactionEpoch = (Long) offsets.get(OFFSET_TRANSACTION_EPOCH);
String transactionRankString = (String) offsets.get(OFFSET_TRANSACTION_RANK);
if (transactionRankString == null) {
context.transactionRank = null;
}
else {
context.transactionRank = new BigInteger(transactionRankString);
}

Iterator var3 = offsets.entrySet().iterator();

while (var3.hasNext()) {
Map.Entry<String, Object> offset = (Map.Entry) var3.next();
if ((offset.getKey()).startsWith(OFFSET_TABLE_COUNT_PREFIX)) {
String dataCollectionId = (offset.getKey()).substring(OFFSET_TABLE_COUNT_PREFIX_LENGTH);
Long count = (Long) offset.getValue();
context.perTableEventCount.put(dataCollectionId, count);
}
}

context.totalEventCount = context.perTableEventCount.values().stream().mapToLong((x) -> x).sum();
return context;
}

@Override
public boolean isTransactionInProgress() {
return !Objects.isNull(this.transactionId);
}

@Override
public String getTransactionId() {
return transactionId;
}

@Override
public long getTotalEventCount() {
return this.totalEventCount;
}

@Override
public void beginTransaction(String txId) {
this.previousTransactionId = this.transactionId;
this.reset();
this.transactionId = txId;
transactionEpoch = this.epochProvider.getEpoch(this.transactionEpoch, previousTransactionId, txId);
transactionRank = this.rankProvider.getRank(txId);
}

@Override
public void endTransaction() {
this.reset();
}

@Override
public long event(DataCollectionId source) {
++this.totalEventCount;
String sourceName = source.toString();
long dataCollectionEventOrder = (Long) this.perTableEventCount.getOrDefault(sourceName, 0L) + 1L;
this.perTableEventCount.put(sourceName, dataCollectionEventOrder);
return dataCollectionEventOrder;
}

@Override
public Map<String, Long> getPerTableEventCount() {
return this.viewPerTableEventCount;
}

@Override
public String toString() {
return "TransactionContext [" +
"currentTransactionId=" + this.transactionId +
", perTableEventCount=" + this.perTableEventCount +
", totalEventCount=" + this.totalEventCount +
", transactionEpoch=" + this.transactionEpoch +
", transactionRank=" + this.transactionRank +
"]";
}

public Long getTransactionEpoch() {
return transactionEpoch;
}

public BigInteger getTransactionRank() {
return transactionRank;
}
}
Loading