Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7698 Add ordered transaction metadata for Vitess connector #188

Merged
merged 15 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/io/debezium/connector/vitess/Vgtid.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.DebeziumException;

import binlogdata.Binlogdata;

/** Vitess source position coordinates. */
Expand Down Expand Up @@ -87,6 +89,15 @@ public List<ShardGtid> getShardGtids() {
return shardGtids;
}

public ShardGtid getShardGtid(String shard) {
for (ShardGtid shardGtid : shardGtids) {
if (shardGtid.shard.equals(shard)) {
return shardGtid;
}
}
throw new DebeziumException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString());
}

public boolean isSingleShard() {
return rawVgtid.getShardGtidsCount() == 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public VitessDatabaseSchema(
schemaNameAdjuster,
config.customConverterRegistry(),
config.getSourceInfoStructMaker().schema(),
config.getTransactionMetadataFactory().getTransactionStructMaker().getTransactionBlockSchema(),
config.getFieldNamer(),
false),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import org.apache.kafka.connect.data.Struct;

import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionInfo;
import io.debezium.data.Envelope;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionInfo;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -61,4 +63,15 @@ public String getTransactionId(
return sourceInfo.getString(SourceInfo.VGTID_KEY);
}

@Override
public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
if (value == null || source == null) {
return null;
}
final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
String vgtid = sourceInfo.getString(SourceInfo.VGTID_KEY);
String shard = sourceInfo.getString(SourceInfo.SHARD_KEY);
return new VitessTransactionInfo(vgtid, shard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ public static VitessOffsetContext initialContext(
VitessConnectorConfig connectorConfig, Clock clock) {
LOGGER.info("No previous offset exists. Use default VGTID.");
final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig);
return new VitessOffsetContext(
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), new TransactionContext());
// use the other transaction context
TransactionContext transactionContext = connectorConfig.getTransactionMetadataFactory().getTransactionContext();
VitessOffsetContext context = new VitessOffsetContext(
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext);
return context;
}

/**
Expand Down Expand Up @@ -143,12 +146,15 @@ public Loader(VitessConnectorConfig connectorConfig) {

@Override
public VitessOffsetContext load(Map<String, ?> offset) {
LOGGER.info("Previous offset exists, load from {}", offset);
final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY);
TransactionContext transactionContext = connectorConfig.getTransactionMetadataFactory()
.getTransactionContext().newTransactionContextFromOffsets(offset);
return new VitessOffsetContext(
connectorConfig,
Vgtid.of(vgtid),
null,
TransactionContext.load(offset));
transactionContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
*/
package io.debezium.connector.vitess;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.schema.SchemaFactory;

public class VitessSchemaFactory extends SchemaFactory {
Expand All @@ -18,4 +24,17 @@ public VitessSchemaFactory() {
public static VitessSchemaFactory get() {
return vitessSchemaFactoryObject;
}

public Schema getOrderedTransactionBlockSchema() {
Schema rankSchema = Decimal.schema(0).schema();
return SchemaBuilder.struct().optional()
.name(TRANSACTION_BLOCK_SCHEMA_NAME)
.version(TRANSACTION_BLOCK_SCHEMA_VERSION)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA)
.field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA)
.field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, rankSchema)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionInfo;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
Expand Down Expand Up @@ -96,7 +97,8 @@ private ReplicationMessageProcessor newReplicationMessageProcessor(VitessPartiti
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
if (message.getOperation() == ReplicationMessage.Operation.BEGIN) {
// send to transaction topic
dispatcher.dispatchTransactionStartedEvent(partition, message.getTransactionId(), offsetContext, message.getCommitTime());
VitessTransactionInfo transactionInfo = new VitessTransactionInfo(message.getTransactionId(), message.getShard());
dispatcher.dispatchTransactionStartedEvent(partition, transactionInfo, offsetContext, message.getCommitTime());
}
else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) {
// send to transaction topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ public class TransactionalMessage implements ReplicationMessage {
private final String transactionId;
private final Instant commitTime;
private final Operation operation;
private final String shard;

public TransactionalMessage(Operation operation, String transactionId, Instant commitTime) {
public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String shard) {
this.transactionId = transactionId;
this.commitTime = commitTime;
this.operation = operation;
this.shard = shard;
}

@Override
Expand All @@ -43,7 +45,7 @@ public String getTable() {

@Override
public String getShard() {
throw new UnsupportedOperationException();
return shard;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc
}
LOGGER.trace("Timestamp of begin transaction: {}", eventTimestamp);
processor.process(
new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp), newVgtid, false);
new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false);
}

private void handleCommitMessage(
Expand All @@ -135,7 +135,7 @@ private void handleCommitMessage(
}
LOGGER.trace("Timestamp of commit transaction: {}", commitTimestamp);
processor.process(
new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp), newVgtid, false);
new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false);
}

private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess.pipeline.txmetadata;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

class Gtid {

public String getVersion() {
return version;
}

private String version = "";

public Set<String> getHosts() {
return hosts;
}

private Set<String> hosts = new HashSet();

public List<String> getSequenceValues() {
return sequenceValues;
}

private List<String> sequenceValues = new ArrayList();

private static final String PREFIX_LAST_CHAR = "/";

private static int getVersionEndIndex(String transactionId) {
return transactionId.indexOf(PREFIX_LAST_CHAR);
}

private static String trimVersion(String transactionId) {
int index = getVersionEndIndex(transactionId);
if (index != -1) {
return transactionId.substring(index + 1);
}
return transactionId;
}

private void initializeVersion(String transactionId) {
int index = getVersionEndIndex(transactionId);
if (index != -1) {
this.version = transactionId.substring(0, index);
}
}

Gtid(String transactionId) {
initializeVersion(transactionId);
parseGtid(transactionId);
}

private void parseGtid(String transactionId) {
transactionId = trimVersion(transactionId);
String[] transactions = transactionId.split(",");
for (String transaction : transactions) {
String[] parts = transaction.split(":");
String hostname = parts[0];
hosts.add(hostname);
String maxSequenceValue = parts[1].split("-")[1];
sequenceValues.add(maxSequenceValue);
}
}

public boolean isHostSetEqual(Gtid hosts) {
return this.hosts.equals(hosts.hosts);
}

public boolean isHostSetSupersetOf(Gtid previousHosts) {
return this.hosts.containsAll(previousHosts.hosts);
}

public boolean isHostSetSubsetOf(Gtid previousHosts) {
return previousHosts.hosts.containsAll(this.hosts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess.pipeline.txmetadata;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.DebeziumException;
import io.debezium.connector.vitess.Vgtid;

public class VitessEpochProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class);
private Map<String, Long> shardToEpoch = new HashMap<>();
private static final ObjectMapper MAPPER = new ObjectMapper();

private static boolean isInvalidGtid(String gtid) {
return gtid.equals(Vgtid.CURRENT_GTID) || gtid.equals(Vgtid.EMPTY_GTID);
}

public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString) {
if (isInvalidGtid(previousGtidString)) {
return previousEpoch + 1;
}
if (isInvalidGtid(gtidString)) {
throw new DebeziumException("Invalid GTID: The current GTID cannot be one of current or empty " + gtidString);
}
Gtid previousGtid = new Gtid(previousGtidString);
Gtid gtid = new Gtid(gtidString);
if (previousGtid.isHostSetEqual(gtid) || gtid.isHostSetSupersetOf(previousGtid)) {
return previousEpoch;
}
else if (gtid.isHostSetSubsetOf(previousGtid)) {
return previousEpoch + 1;
}
else {
LOGGER.error(
"Error determining epoch, previous host set: {}, host set: {}",
previousGtid, gtid);
throw new RuntimeException("Can't determine epoch");
}
}

public Map<String, Object> store(Map<String, Object> offset) {
try {
offset.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch));
return offset;
}
catch (JsonProcessingException e) {
throw new RuntimeException("Cannot store epoch: " + shardToEpoch.toString());
}
}

public void load(Map<String, ?> offsets) {
try {
String shardToEpochString = (String) offsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH);
if (shardToEpochString != null) {
shardToEpoch = MAPPER.readValue(shardToEpochString, new TypeReference<Map<String, Long>>() {
});
}
}
catch (JsonProcessingException e) {
throw new RuntimeException("Cannot read epoch: " + shardToEpoch.toString());
}
}

public Long getEpoch(String shard, String previousVgtidString, String vgtidString) {
if (previousVgtidString == null) {
long epoch = 0L;
storeEpoch(shard, epoch);
return epoch;
}
Vgtid vgtid = Vgtid.of(vgtidString);
Vgtid previousVgtid = Vgtid.of(previousVgtidString);
String previousGtid = previousVgtid.getShardGtid(shard).getGtid();
String gtid = vgtid.getShardGtid(shard).getGtid();
long previousEpoch = shardToEpoch.getOrDefault(shard, 0L);
long currentEpoch = getEpochForGtid(previousEpoch, previousGtid, gtid);
storeEpoch(shard, currentEpoch);
return currentEpoch;
}

private void storeEpoch(String shard, long epoch) {
shardToEpoch.put(shard, epoch);
}
}
Loading